diff --git a/docs-internal/engine/SQLITE_METRICS.md b/docs-internal/engine/SQLITE_METRICS.md index 1f89d52d86..0f66a812a5 100644 --- a/docs-internal/engine/SQLITE_METRICS.md +++ b/docs-internal/engine/SQLITE_METRICS.md @@ -6,8 +6,8 @@ - `sqlite_commit_dirty_page_count{path}`: Histogram of dirty page counts per commit path. - `sqlite_commit_dirty_bytes{path}`: Histogram of raw dirty-page bytes per commit path. - `sqlite_udb_ops_per_commit{path}`: Histogram of UniversalDB operations per commit path. -- `sqlite_commit_envoy_dispatch_duration_seconds`: Pegboard-envoy histogram for websocket frame arrival to `depot` dispatch. -- `sqlite_commit_envoy_response_duration_seconds`: Pegboard-envoy histogram for `depot` return to websocket response send. +- `envoy_sqlite_commit_dispatch_duration_seconds`: Pegboard-envoy histogram for websocket frame arrival to `depot` dispatch. +- `envoy_sqlite_commit_response_duration_seconds`: Pegboard-envoy histogram for `depot` return to websocket response send. - `sqlite_commit_phases`: Actor inspector labeled timing metric exposed from `/inspector/metrics`. Values are `request_build`, `serialize`, `transport`, and `state_update`. ## Scrape Points @@ -23,8 +23,8 @@ ## Diagnosis -- High `decode_request` or `sqlite_commit_envoy_dispatch_duration_seconds` usually means envoy-side validation or actor lookup is slow before storage work starts. +- High `decode_request` or `envoy_sqlite_commit_dispatch_duration_seconds` usually means envoy-side validation or actor lookup is slow before storage work starts. - High `meta_read` or `pidx_read` points at UniversalDB read pressure or cache misses. - High `ltx_encode` means commit encoding and compression are doing real work. Check dirty page counts and raw dirty bytes together. -- High `udb_write`, `meta_write`, or `sqlite_commit_envoy_response_duration_seconds` points at write-path latency after encode. +- High `udb_write`, `meta_write`, or `envoy_sqlite_commit_response_duration_seconds` points at write-path latency after encode. - A healthy actor should show non-zero `sqlite_commit_phases` totals after commits in `/inspector/metrics`. If SQL runs but those timings stay zero, the native VFS metrics path is broken. diff --git a/engine/packages/metrics/src/registry.rs b/engine/packages/metrics/src/registry.rs index c6bd268591..880cc495ae 100644 --- a/engine/packages/metrics/src/registry.rs +++ b/engine/packages/metrics/src/registry.rs @@ -1,7 +1,5 @@ use prometheus::*; lazy_static::lazy_static! { - pub static ref REGISTRY: Registry = Registry::new_custom( - Some("rivet".to_string()), - Some(labels! { })).unwrap(); + pub static ref REGISTRY: Registry = Registry::new_custom(None, Some(labels! { })).unwrap(); } diff --git a/engine/packages/pegboard-envoy/src/metrics.rs b/engine/packages/pegboard-envoy/src/metrics.rs index f292e69746..d7af4d9c9b 100644 --- a/engine/packages/pegboard-envoy/src/metrics.rs +++ b/engine/packages/pegboard-envoy/src/metrics.rs @@ -2,28 +2,28 @@ use rivet_metrics::{BUCKETS, REGISTRY, prometheus::*}; lazy_static::lazy_static! { pub static ref CONNECTION_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!( - "pegboard_envoy_connection_total", + "envoy_connection_total", "Count of envoy connections opened.", &["namespace_id", "pool_name", "protocol_version"], *REGISTRY ).unwrap(); pub static ref EVICTION_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!( - "pegboard_envoy_eviction_total", + "envoy_eviction_total", "Count of envoy connections evicted.", &["namespace_id", "pool_name", "protocol_version"], *REGISTRY ).unwrap(); pub static ref CONNECTION_ACTIVE: IntGaugeVec = register_int_gauge_vec_with_registry!( - "pegboard_envoy_connection_active", + "envoy_connection_active", "Count of envoy connections currently active.", &["namespace_id", "pool_name", "protocol_version"], *REGISTRY ).unwrap(); pub static ref RECEIVE_INIT_PACKET_DURATION: HistogramVec = register_histogram_vec_with_registry!( - "pegboard_envoy_receive_init_packet_duration", + "envoy_receive_init_packet_duration", "Duration to receive the init packet for a envoy connection.", &["namespace_id", "pool_name"], BUCKETS.to_vec(), @@ -31,59 +31,59 @@ lazy_static::lazy_static! { ).unwrap(); pub static ref EVENT_MULTIPLEXER_COUNT: IntGauge = register_int_gauge_with_registry!( - "pegboard_envoy_event_multiplexer_count", + "envoy_event_multiplexer_count", "Number of active actor event multiplexers.", *REGISTRY ).unwrap(); pub static ref INGESTED_EVENTS_TOTAL: IntCounter = register_int_counter_with_registry!( - "pegboard_envoy_ingested_events_total", + "envoy_ingested_events_total", "Count of actor events.", *REGISTRY ).unwrap(); pub static ref SQLITE_COMMIT_ENVOY_DISPATCH_DURATION: Histogram = register_histogram_with_registry!( - "sqlite_commit_envoy_dispatch_duration_seconds", + "envoy_sqlite_commit_dispatch_duration_seconds", "Duration from sqlite commit frame arrival until depot dispatch.", BUCKETS.to_vec(), *REGISTRY ).unwrap(); pub static ref SQLITE_COMMIT_ENVOY_RESPONSE_DURATION: Histogram = register_histogram_with_registry!( - "sqlite_commit_envoy_response_duration_seconds", + "envoy_sqlite_commit_response_duration_seconds", "Duration from depot commit return until the websocket response frame is sent.", BUCKETS.to_vec(), *REGISTRY ).unwrap(); pub static ref SQLITE_MIGRATION_ATTEMPTS_TOTAL: IntCounter = register_int_counter_with_registry!( - "pegboard_envoy_sqlite_migration_attempts_total", + "envoy_sqlite_migration_attempts_total", "Total number of sqlite v1 to v2 migration attempts.", *REGISTRY ).unwrap(); pub static ref SQLITE_MIGRATION_SUCCESSES_TOTAL: IntCounter = register_int_counter_with_registry!( - "pegboard_envoy_sqlite_migration_successes_total", + "envoy_sqlite_migration_successes_total", "Total number of sqlite v1 to v2 migrations that completed successfully.", *REGISTRY ).unwrap(); pub static ref SQLITE_MIGRATION_FAILURES_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!( - "pegboard_envoy_sqlite_migration_failures_total", + "envoy_sqlite_migration_failures_total", "Total number of sqlite v1 to v2 migration failures by phase.", &["phase"], *REGISTRY ).unwrap(); pub static ref SQLITE_MIGRATION_DURATION: Histogram = register_histogram_with_registry!( - "pegboard_envoy_sqlite_migration_duration_seconds", + "envoy_sqlite_migration_duration_seconds", "Duration of sqlite v1 to v2 migrations.", BUCKETS.to_vec(), *REGISTRY ).unwrap(); pub static ref SQLITE_MIGRATION_PAGES: Histogram = register_histogram_with_registry!( - "pegboard_envoy_sqlite_migration_pages", + "envoy_sqlite_migration_pages", "Number of pages imported during sqlite v1 to v2 migration.", BUCKETS.to_vec(), *REGISTRY diff --git a/engine/packages/pegboard/src/actor_kv/metrics.rs b/engine/packages/pegboard/src/actor_kv/metrics.rs index 7716a90342..4cb261605a 100644 --- a/engine/packages/pegboard/src/actor_kv/metrics.rs +++ b/engine/packages/pegboard/src/actor_kv/metrics.rs @@ -2,7 +2,7 @@ use rivet_metrics::{BUCKETS, REGISTRY, prometheus::*}; lazy_static::lazy_static! { pub static ref ACTOR_KV_OPERATION_DURATION: HistogramVec = register_histogram_vec_with_registry!( - "actor_kv_operation_duration_seconds", + "pegboard_actor_kv_operation_duration_seconds", "Duration of actor KV operations including UDB transaction.", &["op"], BUCKETS.to_vec(), @@ -10,7 +10,7 @@ lazy_static::lazy_static! { ).unwrap(); pub static ref ACTOR_KV_KEYS_PER_OP: HistogramVec = register_histogram_vec_with_registry!( - "actor_kv_keys_per_operation", + "pegboard_actor_kv_keys_per_operation", "Number of keys per actor KV operation.", &["op"], vec![1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0], diff --git a/rivetkit-rust/packages/rivetkit-core/src/actor/context.rs b/rivetkit-rust/packages/rivetkit-core/src/actor/context.rs index e67c096f3a..66867b00e0 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/actor/context.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/actor/context.rs @@ -224,18 +224,13 @@ impl ActorContext { name: String, key: ActorKey, region: String, - generation: Option, - envoy_key: String, + _generation: Option, + _envoy_key: String, config: ActorConfig, kv: Kv, sql: SqliteDb, ) -> Self { - let metrics = ActorMetrics::new( - actor_id.clone(), - generation, - format_actor_key(&key), - envoy_key, - ); + let metrics = ActorMetrics::new(name.clone()); #[cfg(feature = "sqlite-local")] let mut sql = sql; #[cfg(feature = "sqlite-local")] diff --git a/rivetkit-rust/packages/rivetkit-core/src/actor/metrics.rs b/rivetkit-rust/packages/rivetkit-core/src/actor/metrics.rs index 6d5f6e11c3..c81dfe4af3 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/actor/metrics.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/actor/metrics.rs @@ -3,75 +3,46 @@ use std::sync::{Arc, LazyLock}; use std::time::Duration; use rivet_metrics::prometheus::{ - CounterVec, GaugeVec, HistogramOpts, HistogramVec, IntCounterVec, IntGaugeVec, Opts, Registry, + CounterVec, HistogramOpts, HistogramVec, IntCounterVec, IntGaugeVec, Opts, Registry, }; -use scc::HashMap as SccHashMap; use crate::actor::task_types::{ShutdownKind, StateMutationReason, UserTaskKind}; -use crate::time::Instant; -const ACTOR_LABELS: &[&str] = &["actor_id_gen", "actor_key", "envoy_key"]; -const USER_TASK_LABELS: &[&str] = &["actor_id_gen", "actor_key", "envoy_key", "kind"]; -const SHUTDOWN_LABELS: &[&str] = &["actor_id_gen", "actor_key", "envoy_key", "reason"]; -const STATE_MUTATION_LABELS: &[&str] = &["actor_id_gen", "actor_key", "envoy_key", "reason"]; -const ACTOR_METRIC_RETENTION: Duration = Duration::from_secs(10 * 60); -const DIRECT_SHUTDOWN_LABELS: &[&str] = &[ - "actor_id_gen", - "actor_key", - "envoy_key", - "subsystem", - "operation", -]; +const ACTOR_LABELS: &[&str] = &["actor_name"]; +const USER_TASK_LABELS: &[&str] = &["actor_name", "kind"]; +const SHUTDOWN_LABELS: &[&str] = &["actor_name", "reason"]; +const STATE_MUTATION_LABELS: &[&str] = &["actor_name", "reason"]; +const DIRECT_SHUTDOWN_LABELS: &[&str] = &["actor_name", "subsystem", "operation"]; #[cfg(feature = "sqlite-local")] -const SQLITE_COMMIT_PHASE_LABELS: &[&str] = &["actor_id_gen", "actor_key", "envoy_key", "phase"]; +const SQLITE_COMMIT_PHASE_LABELS: &[&str] = &["actor_name", "phase"]; #[cfg(feature = "sqlite-local")] -const SQLITE_WORKER_COMMAND_LABELS: &[&str] = - &["actor_id_gen", "actor_key", "envoy_key", "operation"]; +const SQLITE_WORKER_COMMAND_LABELS: &[&str] = &["actor_name", "operation"]; #[cfg(feature = "sqlite-local")] -const SQLITE_WORKER_ERROR_LABELS: &[&str] = - &["actor_id_gen", "actor_key", "envoy_key", "operation", "code"]; +const SQLITE_WORKER_ERROR_LABELS: &[&str] = &["actor_name", "operation", "code"]; +#[derive(Clone)] pub(crate) struct ActorMetrics { - labels: Arc, + inner: Arc, } -#[derive(Clone, Debug, Eq, Hash, PartialEq)] -struct ActorMetricLabels { - actor_id_gen: String, - actor_key: String, - envoy_key: String, +#[derive(Debug)] +struct ActorMetricInner { + labels: ActorMetricLabels, } -#[derive(Default)] -struct RetainedActorMetrics { - active_refs: usize, - expires_at: Option, - user_task_kinds: Vec<&'static str>, - shutdown_reasons: Vec<&'static str>, - state_mutation_reasons: Vec<&'static str>, - direct_shutdown_labels: Vec<(String, String)>, - #[cfg(feature = "sqlite-local")] - sqlite_commit_phases: Vec<&'static str>, - #[cfg(feature = "sqlite-local")] - sqlite_worker_operations: Vec<&'static str>, - #[cfg(feature = "sqlite-local")] - sqlite_worker_error_labels: Vec<(&'static str, &'static str)>, +#[derive(Debug)] +struct ActorMetricLabels { + actor_name: String, } struct ActorMetricCollectors { - actor_active: IntGaugeVec, - create_state_ms: GaugeVec, - create_vars_ms: GaugeVec, - queue_depth: IntGaugeVec, + actor_active_count: IntGaugeVec, + create_state_duration_seconds: HistogramVec, + create_vars_duration_seconds: HistogramVec, queue_messages_sent_total: IntCounterVec, queue_messages_received_total: IntCounterVec, - active_connections: IntGaugeVec, connections_total: IntCounterVec, - lifecycle_inbox_depth: IntGaugeVec, - dispatch_inbox_depth: IntGaugeVec, - lifecycle_event_inbox_depth: IntGaugeVec, - user_tasks_active: IntGaugeVec, user_task_duration_seconds: HistogramVec, shutdown_wait_seconds: HistogramVec, shutdown_timeout_total: CounterVec, @@ -104,8 +75,6 @@ struct ActorMetricCollectors { #[cfg(feature = "sqlite-local")] sqlite_vfs_commit_duration_seconds_total: CounterVec, #[cfg(feature = "sqlite-local")] - sqlite_worker_queue_depth: IntGaugeVec, - #[cfg(feature = "sqlite-local")] sqlite_worker_queue_overload_total: IntCounterVec, #[cfg(feature = "sqlite-local")] sqlite_worker_command_duration_seconds: HistogramVec, @@ -122,101 +91,57 @@ struct ActorMetricCollectors { } static METRICS: LazyLock = LazyLock::new(ActorMetricCollectors::new); -static RETAINED_ACTORS: LazyLock> = - LazyLock::new(SccHashMap::new); impl ActorMetricCollectors { fn new() -> Self { - let actor_active = IntGaugeVec::new( + let actor_active_count = IntGaugeVec::new( Opts::new( - "actor_active", - "whether an actor is currently active, retained briefly after shutdown", + "rivetkit_actor_active_count", + "current active actors in this process", ), ACTOR_LABELS, ) - .expect("create actor_active gauge"); - let create_state_ms = GaugeVec::new( - Opts::new( - "actor_create_state_ms", - "time spent creating typed actor state during startup", + .expect("create actor_active_count gauge"); + let create_state_duration_seconds = HistogramVec::new( + HistogramOpts::new( + "rivetkit_actor_create_state_duration_seconds", + "typed actor state creation time during startup in seconds", ), ACTOR_LABELS, ) - .expect("create actor_create_state_ms gauge"); - let create_vars_ms = GaugeVec::new( - Opts::new( - "actor_create_vars_ms", - "time spent creating typed actor vars during startup", + .expect("create actor_create_state_duration_seconds histogram"); + let create_vars_duration_seconds = HistogramVec::new( + HistogramOpts::new( + "rivetkit_actor_create_vars_duration_seconds", + "typed actor vars creation time during startup in seconds", ), ACTOR_LABELS, ) - .expect("create actor_create_vars_ms gauge"); - let queue_depth = IntGaugeVec::new( - Opts::new("actor_queue_depth", "current actor queue depth"), - ACTOR_LABELS, - ) - .expect("create actor_queue_depth gauge"); + .expect("create actor_create_vars_duration_seconds histogram"); let queue_messages_sent_total = IntCounterVec::new( - Opts::new("actor_queue_messages_sent_total", "total queue messages sent"), + Opts::new("rivetkit_actor_queue_messages_sent_total", "total queue messages sent"), ACTOR_LABELS, ) .expect("create actor_queue_messages_sent_total counter"); let queue_messages_received_total = IntCounterVec::new( Opts::new( - "actor_queue_messages_received_total", + "rivetkit_actor_queue_messages_received_total", "total queue messages received", ), ACTOR_LABELS, ) .expect("create actor_queue_messages_received_total counter"); - let active_connections = IntGaugeVec::new( - Opts::new( - "actor_active_connections", - "current active actor connections", - ), - ACTOR_LABELS, - ) - .expect("create actor_active_connections gauge"); let connections_total = IntCounterVec::new( Opts::new( - "actor_connections_total", + "rivetkit_actor_connections_total", "total successfully established actor connections", ), ACTOR_LABELS, ) .expect("create actor_connections_total counter"); - let lifecycle_inbox_depth = IntGaugeVec::new( - Opts::new( - "actor_lifecycle_inbox_depth", - "current actor lifecycle command inbox depth", - ), - ACTOR_LABELS, - ) - .expect("create actor_lifecycle_inbox_depth gauge"); - let dispatch_inbox_depth = IntGaugeVec::new( - Opts::new( - "actor_dispatch_inbox_depth", - "current actor dispatch command inbox depth", - ), - ACTOR_LABELS, - ) - .expect("create actor_dispatch_inbox_depth gauge"); - let lifecycle_event_inbox_depth = IntGaugeVec::new( - Opts::new( - "actor_lifecycle_event_inbox_depth", - "current actor lifecycle event inbox depth", - ), - ACTOR_LABELS, - ) - .expect("create actor_lifecycle_event_inbox_depth gauge"); - let user_tasks_active = IntGaugeVec::new( - Opts::new("actor_user_tasks_active", "current active actor user tasks"), - USER_TASK_LABELS, - ) - .expect("create actor_user_tasks_active gauge"); let user_task_duration_seconds = HistogramVec::new( HistogramOpts::new( - "actor_user_task_duration_seconds", + "rivetkit_actor_user_task_duration_seconds", "actor user task execution time in seconds", ), USER_TASK_LABELS, @@ -224,7 +149,7 @@ impl ActorMetricCollectors { .expect("create actor_user_task_duration_seconds histogram"); let shutdown_wait_seconds = HistogramVec::new( HistogramOpts::new( - "actor_shutdown_wait_seconds", + "rivetkit_actor_shutdown_wait_seconds", "actor shutdown wait time in seconds", ), SHUTDOWN_LABELS, @@ -232,20 +157,20 @@ impl ActorMetricCollectors { .expect("create actor_shutdown_wait_seconds histogram"); let shutdown_timeout_total = CounterVec::new( Opts::new( - "actor_shutdown_timeout_total", + "rivetkit_actor_shutdown_timeout_total", "total actor shutdown timeout events", ), SHUTDOWN_LABELS, ) .expect("create actor_shutdown_timeout_total counter"); let state_mutation_total = CounterVec::new( - Opts::new("actor_state_mutation_total", "total actor state mutations"), + Opts::new("rivetkit_actor_state_mutation_total", "total actor state mutations"), STATE_MUTATION_LABELS, ) .expect("create actor_state_mutation_total counter"); let direct_subsystem_shutdown_warning_total = CounterVec::new( Opts::new( - "actor_direct_subsystem_shutdown_warning_total", + "rivetkit_actor_direct_subsystem_shutdown_warning_total", "total actor shutdown warnings emitted by direct subsystem drains", ), DIRECT_SHUTDOWN_LABELS, @@ -255,7 +180,7 @@ impl ActorMetricCollectors { #[cfg(feature = "sqlite-local")] let sqlite_vfs_resolve_pages_total = IntCounterVec::new( Opts::new( - "actor_sqlite_vfs_resolve_pages_total", + "rivetkit_actor_sqlite_vfs_resolve_pages_total", "total VFS page resolution attempts", ), ACTOR_LABELS, @@ -264,7 +189,7 @@ impl ActorMetricCollectors { #[cfg(feature = "sqlite-local")] let sqlite_vfs_resolve_pages_requested_total = IntCounterVec::new( Opts::new( - "actor_sqlite_vfs_resolve_pages_requested_total", + "rivetkit_actor_sqlite_vfs_resolve_pages_requested_total", "total pages requested by VFS page resolution attempts", ), ACTOR_LABELS, @@ -273,7 +198,7 @@ impl ActorMetricCollectors { #[cfg(feature = "sqlite-local")] let sqlite_vfs_resolve_pages_cache_hits_total = IntCounterVec::new( Opts::new( - "actor_sqlite_vfs_resolve_pages_cache_hits_total", + "rivetkit_actor_sqlite_vfs_resolve_pages_cache_hits_total", "total pages resolved from the VFS page cache or write buffer", ), ACTOR_LABELS, @@ -282,7 +207,7 @@ impl ActorMetricCollectors { #[cfg(feature = "sqlite-local")] let sqlite_vfs_resolve_pages_cache_misses_total = IntCounterVec::new( Opts::new( - "actor_sqlite_vfs_resolve_pages_cache_misses_total", + "rivetkit_actor_sqlite_vfs_resolve_pages_cache_misses_total", "total pages missing from the VFS page cache and write buffer", ), ACTOR_LABELS, @@ -291,7 +216,7 @@ impl ActorMetricCollectors { #[cfg(feature = "sqlite-local")] let sqlite_vfs_get_pages_total = IntCounterVec::new( Opts::new( - "actor_sqlite_vfs_get_pages_total", + "rivetkit_actor_sqlite_vfs_get_pages_total", "total VFS to engine get_pages requests", ), ACTOR_LABELS, @@ -300,7 +225,7 @@ impl ActorMetricCollectors { #[cfg(feature = "sqlite-local")] let sqlite_vfs_pages_fetched_total = IntCounterVec::new( Opts::new( - "actor_sqlite_vfs_pages_fetched_total", + "rivetkit_actor_sqlite_vfs_pages_fetched_total", "total pages requested from the engine by VFS get_pages calls", ), ACTOR_LABELS, @@ -309,7 +234,7 @@ impl ActorMetricCollectors { #[cfg(feature = "sqlite-local")] let sqlite_vfs_prefetch_pages_total = IntCounterVec::new( Opts::new( - "actor_sqlite_vfs_prefetch_pages_total", + "rivetkit_actor_sqlite_vfs_prefetch_pages_total", "total pages requested speculatively by VFS prefetch", ), ACTOR_LABELS, @@ -318,7 +243,7 @@ impl ActorMetricCollectors { #[cfg(feature = "sqlite-local")] let sqlite_vfs_bytes_fetched_total = IntCounterVec::new( Opts::new( - "actor_sqlite_vfs_bytes_fetched_total", + "rivetkit_actor_sqlite_vfs_bytes_fetched_total", "total bytes requested from the engine by VFS get_pages calls", ), ACTOR_LABELS, @@ -327,7 +252,7 @@ impl ActorMetricCollectors { #[cfg(feature = "sqlite-local")] let sqlite_vfs_prefetch_bytes_total = IntCounterVec::new( Opts::new( - "actor_sqlite_vfs_prefetch_bytes_total", + "rivetkit_actor_sqlite_vfs_prefetch_bytes_total", "total bytes requested speculatively by VFS prefetch", ), ACTOR_LABELS, @@ -336,7 +261,7 @@ impl ActorMetricCollectors { #[cfg(feature = "sqlite-local")] let sqlite_vfs_get_pages_duration_seconds = HistogramVec::new( HistogramOpts::new( - "actor_sqlite_vfs_get_pages_duration_seconds", + "rivetkit_actor_sqlite_vfs_get_pages_duration_seconds", "VFS get_pages request duration in seconds", ) .buckets(vec![ @@ -348,7 +273,7 @@ impl ActorMetricCollectors { #[cfg(feature = "sqlite-local")] let sqlite_vfs_commit_total = IntCounterVec::new( Opts::new( - "actor_sqlite_vfs_commit_total", + "rivetkit_actor_sqlite_vfs_commit_total", "total successful VFS commits", ), ACTOR_LABELS, @@ -357,7 +282,7 @@ impl ActorMetricCollectors { #[cfg(feature = "sqlite-local")] let sqlite_vfs_commit_phase_duration_seconds_total = CounterVec::new( Opts::new( - "actor_sqlite_vfs_commit_phase_duration_seconds_total", + "rivetkit_actor_sqlite_vfs_commit_phase_duration_seconds_total", "cumulative VFS commit phase duration in seconds", ), SQLITE_COMMIT_PHASE_LABELS, @@ -366,25 +291,16 @@ impl ActorMetricCollectors { #[cfg(feature = "sqlite-local")] let sqlite_vfs_commit_duration_seconds_total = CounterVec::new( Opts::new( - "actor_sqlite_vfs_commit_duration_seconds_total", + "rivetkit_actor_sqlite_vfs_commit_duration_seconds_total", "cumulative VFS commit duration in seconds", ), SQLITE_COMMIT_PHASE_LABELS, ) .expect("create actor_sqlite_vfs_commit_duration_seconds_total counter"); #[cfg(feature = "sqlite-local")] - let sqlite_worker_queue_depth = IntGaugeVec::new( - Opts::new( - "actor_sqlite_worker_queue_depth", - "current native SQLite worker SQL command queue depth", - ), - ACTOR_LABELS, - ) - .expect("create actor_sqlite_worker_queue_depth gauge"); - #[cfg(feature = "sqlite-local")] let sqlite_worker_queue_overload_total = IntCounterVec::new( Opts::new( - "actor_sqlite_worker_queue_overload_total", + "rivetkit_actor_sqlite_worker_queue_overload_total", "total native SQLite worker SQL command queue overloads", ), ACTOR_LABELS, @@ -393,7 +309,7 @@ impl ActorMetricCollectors { #[cfg(feature = "sqlite-local")] let sqlite_worker_command_duration_seconds = HistogramVec::new( HistogramOpts::new( - "actor_sqlite_worker_command_duration_seconds", + "rivetkit_actor_sqlite_worker_command_duration_seconds", "native SQLite worker SQL command duration in seconds", ) .buckets(sqlite_worker_duration_buckets()), @@ -403,7 +319,7 @@ impl ActorMetricCollectors { #[cfg(feature = "sqlite-local")] let sqlite_worker_command_error_total = CounterVec::new( Opts::new( - "actor_sqlite_worker_command_error_total", + "rivetkit_actor_sqlite_worker_command_error_total", "total native SQLite worker SQL command errors", ), SQLITE_WORKER_ERROR_LABELS, @@ -412,7 +328,7 @@ impl ActorMetricCollectors { #[cfg(feature = "sqlite-local")] let sqlite_worker_close_duration_seconds = HistogramVec::new( HistogramOpts::new( - "actor_sqlite_worker_close_duration_seconds", + "rivetkit_actor_sqlite_worker_close_duration_seconds", "native SQLite worker close duration in seconds", ) .buckets(sqlite_worker_duration_buckets()), @@ -422,7 +338,7 @@ impl ActorMetricCollectors { #[cfg(feature = "sqlite-local")] let sqlite_worker_close_timeout_total = IntCounterVec::new( Opts::new( - "actor_sqlite_worker_close_timeout_total", + "rivetkit_actor_sqlite_worker_close_timeout_total", "total native SQLite worker close timeouts", ), ACTOR_LABELS, @@ -431,7 +347,7 @@ impl ActorMetricCollectors { #[cfg(feature = "sqlite-local")] let sqlite_worker_crash_total = IntCounterVec::new( Opts::new( - "actor_sqlite_worker_crash_total", + "rivetkit_actor_sqlite_worker_crash_total", "total native SQLite worker crashes", ), ACTOR_LABELS, @@ -440,25 +356,25 @@ impl ActorMetricCollectors { #[cfg(feature = "sqlite-local")] let sqlite_worker_unclean_close_total = IntCounterVec::new( Opts::new( - "actor_sqlite_worker_unclean_close_total", + "rivetkit_actor_sqlite_worker_unclean_close_total", "total native SQLite worker channel drops without clean close", ), ACTOR_LABELS, ) .expect("create actor_sqlite_worker_unclean_close_total counter"); - register_metric(&rivet_metrics::REGISTRY, actor_active.clone()); - register_metric(&rivet_metrics::REGISTRY, create_state_ms.clone()); - register_metric(&rivet_metrics::REGISTRY, create_vars_ms.clone()); - register_metric(&rivet_metrics::REGISTRY, queue_depth.clone()); + register_metric(&rivet_metrics::REGISTRY, actor_active_count.clone()); + register_metric( + &rivet_metrics::REGISTRY, + create_state_duration_seconds.clone(), + ); + register_metric( + &rivet_metrics::REGISTRY, + create_vars_duration_seconds.clone(), + ); register_metric(&rivet_metrics::REGISTRY, queue_messages_sent_total.clone()); register_metric(&rivet_metrics::REGISTRY, queue_messages_received_total.clone()); - register_metric(&rivet_metrics::REGISTRY, active_connections.clone()); register_metric(&rivet_metrics::REGISTRY, connections_total.clone()); - register_metric(&rivet_metrics::REGISTRY, lifecycle_inbox_depth.clone()); - register_metric(&rivet_metrics::REGISTRY, dispatch_inbox_depth.clone()); - register_metric(&rivet_metrics::REGISTRY, lifecycle_event_inbox_depth.clone()); - register_metric(&rivet_metrics::REGISTRY, user_tasks_active.clone()); register_metric(&rivet_metrics::REGISTRY, user_task_duration_seconds.clone()); register_metric(&rivet_metrics::REGISTRY, shutdown_wait_seconds.clone()); register_metric(&rivet_metrics::REGISTRY, shutdown_timeout_total.clone()); @@ -500,7 +416,6 @@ impl ActorMetricCollectors { &rivet_metrics::REGISTRY, sqlite_vfs_commit_duration_seconds_total.clone(), ); - register_metric(&rivet_metrics::REGISTRY, sqlite_worker_queue_depth.clone()); register_metric(&rivet_metrics::REGISTRY, sqlite_worker_queue_overload_total.clone()); register_metric( &rivet_metrics::REGISTRY, @@ -514,18 +429,12 @@ impl ActorMetricCollectors { } Self { - actor_active, - create_state_ms, - create_vars_ms, - queue_depth, + actor_active_count, + create_state_duration_seconds, + create_vars_duration_seconds, queue_messages_sent_total, queue_messages_received_total, - active_connections, connections_total, - lifecycle_inbox_depth, - dispatch_inbox_depth, - lifecycle_event_inbox_depth, - user_tasks_active, user_task_duration_seconds, shutdown_wait_seconds, shutdown_timeout_total, @@ -558,8 +467,6 @@ impl ActorMetricCollectors { #[cfg(feature = "sqlite-local")] sqlite_vfs_commit_duration_seconds_total, #[cfg(feature = "sqlite-local")] - sqlite_worker_queue_depth, - #[cfg(feature = "sqlite-local")] sqlite_worker_queue_overload_total, #[cfg(feature = "sqlite-local")] sqlite_worker_command_duration_seconds, @@ -578,52 +485,42 @@ impl ActorMetricCollectors { } impl ActorMetrics { - pub(crate) fn new( - actor_id: impl Into, - generation: Option, - actor_key: impl Into, - envoy_key: impl Into, - ) -> Self { - let actor_id = actor_id.into(); - let labels = Arc::new(ActorMetricLabels { - actor_id_gen: generation - .map(|generation| format!("{actor_id}:{generation}")) - .unwrap_or_else(|| format!("{actor_id}:")), - actor_key: actor_key.into(), - envoy_key: envoy_key.into(), - }); - retain_actor_metrics(&labels); - Self { labels } + pub(crate) fn new(actor_name: impl Into) -> Self { + let labels = ActorMetricLabels { + actor_name: actor_name.into(), + }; + METRICS + .actor_active_count + .with_label_values(&labels.as_label_values()) + .inc(); + Self { + inner: Arc::new(ActorMetricInner { labels }), + } + } + + fn labels(&self) -> &ActorMetricLabels { + &self.inner.labels } - fn actor_labels(&self) -> [&str; 3] { - [ - self.labels.actor_id_gen.as_str(), - self.labels.actor_key.as_str(), - self.labels.envoy_key.as_str(), - ] + fn actor_labels(&self) -> [&str; 1] { + self.labels().as_label_values() } pub(crate) fn observe_create_state(&self, duration: Duration) { METRICS - .create_state_ms + .create_state_duration_seconds .with_label_values(&self.actor_labels()) - .set(duration_ms(duration)); + .observe(duration.as_secs_f64()); } pub(crate) fn observe_create_vars(&self, duration: Duration) { METRICS - .create_vars_ms + .create_vars_duration_seconds .with_label_values(&self.actor_labels()) - .set(duration_ms(duration)); + .observe(duration.as_secs_f64()); } - pub(crate) fn set_queue_depth(&self, depth: u32) { - METRICS - .queue_depth - .with_label_values(&self.actor_labels()) - .set(i64::from(depth)); - } + pub(crate) fn set_queue_depth(&self, _depth: u32) {} pub(crate) fn add_queue_messages_sent(&self, count: u64) { METRICS @@ -639,12 +536,7 @@ impl ActorMetrics { .inc_by(count); } - pub(crate) fn set_active_connections(&self, count: usize) { - METRICS - .active_connections - .with_label_values(&self.actor_labels()) - .set(count.try_into().unwrap_or(i64::MAX)); - } + pub(crate) fn set_active_connections(&self, _count: usize) {} pub(crate) fn inc_connections_total(&self) { METRICS @@ -653,105 +545,76 @@ impl ActorMetrics { .inc(); } - pub(crate) fn set_lifecycle_inbox_depth(&self, depth: usize) { - METRICS - .lifecycle_inbox_depth - .with_label_values(&self.actor_labels()) - .set(depth.try_into().unwrap_or(i64::MAX)); - } + pub(crate) fn set_lifecycle_inbox_depth(&self, _depth: usize) {} - pub(crate) fn set_dispatch_inbox_depth(&self, depth: usize) { - METRICS - .dispatch_inbox_depth - .with_label_values(&self.actor_labels()) - .set(depth.try_into().unwrap_or(i64::MAX)); - } + pub(crate) fn set_dispatch_inbox_depth(&self, _depth: usize) {} - pub(crate) fn set_lifecycle_event_inbox_depth(&self, depth: usize) { - METRICS - .lifecycle_event_inbox_depth - .with_label_values(&self.actor_labels()) - .set(depth.try_into().unwrap_or(i64::MAX)); - } + pub(crate) fn set_lifecycle_event_inbox_depth(&self, _depth: usize) {} - pub(crate) fn begin_user_task(&self, kind: UserTaskKind) { - record_retained_actor_metrics(&self.labels, |retained| { - push_unique(&mut retained.user_task_kinds, kind.as_metric_label()); - }); - let labels = self.actor_labels(); - METRICS - .user_tasks_active - .with_label_values(&[labels[0], labels[1], labels[2], kind.as_metric_label()]) - .inc(); - } + pub(crate) fn begin_user_task(&self, _kind: UserTaskKind) {} pub(crate) fn end_user_task(&self, kind: UserTaskKind, duration: Duration) { - record_retained_actor_metrics(&self.labels, |retained| { - push_unique(&mut retained.user_task_kinds, kind.as_metric_label()); - }); let labels = self.actor_labels(); - let labels = [labels[0], labels[1], labels[2], kind.as_metric_label()]; - METRICS - .user_tasks_active - .with_label_values(&labels) - .dec(); METRICS .user_task_duration_seconds - .with_label_values(&labels) + .with_label_values(&[labels[0], kind.as_metric_label()]) .observe(duration.as_secs_f64()); } pub(crate) fn observe_shutdown_wait(&self, reason: ShutdownKind, duration: Duration) { - record_retained_actor_metrics(&self.labels, |retained| { - push_unique(&mut retained.shutdown_reasons, reason.as_metric_label()); - }); let labels = self.actor_labels(); METRICS .shutdown_wait_seconds - .with_label_values(&[labels[0], labels[1], labels[2], reason.as_metric_label()]) + .with_label_values(&[labels[0], reason.as_metric_label()]) .observe(duration.as_secs_f64()); } pub(crate) fn inc_shutdown_timeout(&self, reason: ShutdownKind) { - record_retained_actor_metrics(&self.labels, |retained| { - push_unique(&mut retained.shutdown_reasons, reason.as_metric_label()); - }); let labels = self.actor_labels(); METRICS .shutdown_timeout_total - .with_label_values(&[labels[0], labels[1], labels[2], reason.as_metric_label()]) + .with_label_values(&[labels[0], reason.as_metric_label()]) .inc(); } pub(crate) fn inc_state_mutation(&self, reason: StateMutationReason) { - record_retained_actor_metrics(&self.labels, |retained| { - push_unique( - &mut retained.state_mutation_reasons, - reason.as_metric_label(), - ); - }); let labels = self.actor_labels(); METRICS .state_mutation_total - .with_label_values(&[labels[0], labels[1], labels[2], reason.as_metric_label()]) + .with_label_values(&[labels[0], reason.as_metric_label()]) .inc(); } pub(crate) fn inc_direct_subsystem_shutdown_warning(&self, subsystem: &str, operation: &str) { - record_retained_actor_metrics(&self.labels, |retained| { - push_unique( - &mut retained.direct_shutdown_labels, - (subsystem.to_owned(), operation.to_owned()), - ); - }); let labels = self.actor_labels(); METRICS .direct_subsystem_shutdown_warning_total - .with_label_values(&[labels[0], labels[1], labels[2], subsystem, operation]) + .with_label_values(&[labels[0], subsystem, operation]) .inc(); } } +impl Drop for ActorMetricInner { + fn drop(&mut self) { + METRICS + .actor_active_count + .with_label_values(&self.labels.as_label_values()) + .dec(); + } +} + +impl Default for ActorMetrics { + fn default() -> Self { + Self::new("") + } +} + +impl fmt::Debug for ActorMetrics { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ActorMetrics").finish() + } +} + #[cfg(feature = "sqlite-local")] impl depot_client::vfs::SqliteVfsMetrics for ActorMetrics { fn record_resolve_pages(&self, requested_pages: u64) { @@ -826,11 +689,6 @@ impl depot_client::vfs::SqliteVfsMetrics for ActorMetrics { state_update_ns: u64, total_ns: u64, ) { - record_retained_actor_metrics(&self.labels, |retained| { - for phase in ["request_build", "serialize", "transport", "state_update", "total"] { - push_unique(&mut retained.sqlite_commit_phases, phase); - } - }); let labels = self.actor_labels(); for (phase, duration_ns) in [ ("request_build", request_build_ns), @@ -840,21 +698,16 @@ impl depot_client::vfs::SqliteVfsMetrics for ActorMetrics { ] { METRICS .sqlite_vfs_commit_phase_duration_seconds_total - .with_label_values(&[labels[0], labels[1], labels[2], phase]) + .with_label_values(&[labels[0], phase]) .inc_by(ns_to_seconds(duration_ns)); } METRICS .sqlite_vfs_commit_duration_seconds_total - .with_label_values(&[labels[0], labels[1], labels[2], "total"]) + .with_label_values(&[labels[0], "total"]) .inc_by(ns_to_seconds(total_ns)); } - fn set_worker_queue_depth(&self, depth: u64) { - METRICS - .sqlite_worker_queue_depth - .with_label_values(&self.actor_labels()) - .set(depth as i64); - } + fn set_worker_queue_depth(&self, _depth: u64) {} fn record_worker_queue_overload(&self) { METRICS @@ -864,25 +717,18 @@ impl depot_client::vfs::SqliteVfsMetrics for ActorMetrics { } fn observe_worker_command_duration(&self, operation: &'static str, duration_ns: u64) { - record_retained_actor_metrics(&self.labels, |retained| { - push_unique(&mut retained.sqlite_worker_operations, operation); - }); let labels = self.actor_labels(); METRICS .sqlite_worker_command_duration_seconds - .with_label_values(&[labels[0], labels[1], labels[2], operation]) + .with_label_values(&[labels[0], operation]) .observe(ns_to_seconds(duration_ns)); } fn record_worker_command_error(&self, operation: &'static str, code: &'static str) { - record_retained_actor_metrics(&self.labels, |retained| { - push_unique(&mut retained.sqlite_worker_operations, operation); - push_unique(&mut retained.sqlite_worker_error_labels, (operation, code)); - }); let labels = self.actor_labels(); METRICS .sqlite_worker_command_error_total - .with_label_values(&[labels[0], labels[1], labels[2], operation, code]) + .with_label_values(&[labels[0], operation, code]) .inc(); } @@ -915,36 +761,6 @@ impl depot_client::vfs::SqliteVfsMetrics for ActorMetrics { } } -impl Clone for ActorMetrics { - fn clone(&self) -> Self { - retain_actor_metrics(&self.labels); - Self { - labels: self.labels.clone(), - } - } -} - -impl Drop for ActorMetrics { - fn drop(&mut self) { - release_actor_metrics(&self.labels); - } -} - -impl Default for ActorMetrics { - fn default() -> Self { - Self::new("", None, "", "") - } -} - -impl fmt::Debug for ActorMetrics { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("ActorMetrics").finish() - } -} - -fn duration_ms(duration: Duration) -> f64 { - duration.as_secs_f64() * 1000.0 -} #[cfg(feature = "sqlite-local")] fn ns_to_seconds(duration_ns: u64) -> f64 { @@ -958,210 +774,9 @@ fn sqlite_worker_duration_buckets() -> Vec { ] } -fn retain_actor_metrics(labels: &Arc) { - cleanup_expired_actor_metrics(Instant::now()); - let mut retained = RETAINED_ACTORS - .entry_sync(labels.as_ref().clone()) - .or_default(); - retained.active_refs = retained.active_refs.saturating_add(1); - retained.expires_at = None; - METRICS - .actor_active - .with_label_values(&labels.as_label_values()) - .set(1); -} - -fn release_actor_metrics(labels: &Arc) { - let now = Instant::now(); - let mut inactive = false; - if let Some(mut retained) = RETAINED_ACTORS.get_sync(labels.as_ref()) { - retained.active_refs = retained.active_refs.saturating_sub(1); - if retained.active_refs == 0 { - retained.expires_at = Some(now + ACTOR_METRIC_RETENTION); - inactive = true; - } - } - if inactive { - METRICS - .actor_active - .with_label_values(&labels.as_label_values()) - .set(0); - } - cleanup_expired_actor_metrics(now); -} - -fn record_retained_actor_metrics( - labels: &Arc, - record: impl FnOnce(&mut RetainedActorMetrics), -) { - let mut retained = RETAINED_ACTORS - .entry_sync(labels.as_ref().clone()) - .or_default(); - record(&mut retained); -} - -fn cleanup_expired_actor_metrics(now: Instant) { - RETAINED_ACTORS.retain_sync(|labels, retained| { - let expired = retained - .expires_at - .is_some_and(|expires_at| now >= expires_at); - if expired { - remove_retained_actor_metrics(labels, retained); - } - !expired - }); -} - -fn remove_retained_actor_metrics(labels: &ActorMetricLabels, retained: &RetainedActorMetrics) { - let actor_labels = labels.as_label_values(); - let metrics = &*METRICS; - macro_rules! remove_actor_labels { - ($($metric:ident),+ $(,)?) => { - $( - ignore_missing_labels(metrics.$metric.remove_label_values(&actor_labels)); - )+ - }; - } - - remove_actor_labels!( - actor_active, - create_state_ms, - create_vars_ms, - queue_depth, - queue_messages_sent_total, - queue_messages_received_total, - active_connections, - connections_total, - lifecycle_inbox_depth, - dispatch_inbox_depth, - lifecycle_event_inbox_depth, - ); - - for kind in &retained.user_task_kinds { - let labels = [actor_labels[0], actor_labels[1], actor_labels[2], *kind]; - ignore_missing_labels(metrics.user_tasks_active.remove_label_values(&labels)); - ignore_missing_labels( - metrics - .user_task_duration_seconds - .remove_label_values(&labels), - ); - } - for reason in &retained.shutdown_reasons { - let labels = [actor_labels[0], actor_labels[1], actor_labels[2], *reason]; - ignore_missing_labels(metrics.shutdown_wait_seconds.remove_label_values(&labels)); - ignore_missing_labels(metrics.shutdown_timeout_total.remove_label_values(&labels)); - } - for reason in &retained.state_mutation_reasons { - let labels = [actor_labels[0], actor_labels[1], actor_labels[2], *reason]; - ignore_missing_labels(metrics.state_mutation_total.remove_label_values(&labels)); - } - for (subsystem, operation) in &retained.direct_shutdown_labels { - let labels = [ - actor_labels[0], - actor_labels[1], - actor_labels[2], - subsystem.as_str(), - operation.as_str(), - ]; - ignore_missing_labels( - metrics - .direct_subsystem_shutdown_warning_total - .remove_label_values(&labels), - ); - } - - #[cfg(feature = "sqlite-local")] - { - remove_actor_labels!( - sqlite_vfs_resolve_pages_total, - sqlite_vfs_resolve_pages_requested_total, - sqlite_vfs_resolve_pages_cache_hits_total, - sqlite_vfs_resolve_pages_cache_misses_total, - sqlite_vfs_get_pages_total, - sqlite_vfs_pages_fetched_total, - sqlite_vfs_prefetch_pages_total, - sqlite_vfs_bytes_fetched_total, - sqlite_vfs_prefetch_bytes_total, - sqlite_vfs_get_pages_duration_seconds, - sqlite_vfs_commit_total, - sqlite_worker_queue_depth, - sqlite_worker_queue_overload_total, - sqlite_worker_close_duration_seconds, - sqlite_worker_close_timeout_total, - sqlite_worker_crash_total, - sqlite_worker_unclean_close_total, - ); - - for phase in &retained.sqlite_commit_phases { - let labels = [actor_labels[0], actor_labels[1], actor_labels[2], *phase]; - ignore_missing_labels( - metrics - .sqlite_vfs_commit_phase_duration_seconds_total - .remove_label_values(&labels), - ); - ignore_missing_labels( - metrics - .sqlite_vfs_commit_duration_seconds_total - .remove_label_values(&labels), - ); - } - for operation in &retained.sqlite_worker_operations { - let labels = [actor_labels[0], actor_labels[1], actor_labels[2], *operation]; - ignore_missing_labels( - metrics - .sqlite_worker_command_duration_seconds - .remove_label_values(&labels), - ); - } - for (operation, code) in &retained.sqlite_worker_error_labels { - let labels = [ - actor_labels[0], - actor_labels[1], - actor_labels[2], - *operation, - *code, - ]; - ignore_missing_labels( - metrics - .sqlite_worker_command_error_total - .remove_label_values(&labels), - ); - } - } -} - -fn push_unique(values: &mut Vec, value: T) { - if !values.contains(&value) { - values.push(value); - } -} - -fn ignore_missing_labels(result: rivet_metrics::prometheus::Result<()>) { - match result { - Ok(()) => {} - Err(error) if is_missing_labels_error(&error) => {} - Err(error) => { - tracing::debug!(?error, "failed to remove retained actor metric labels"); - } - } -} - -fn is_missing_labels_error(error: &rivet_metrics::prometheus::Error) -> bool { - matches!( - error, - rivet_metrics::prometheus::Error::Msg(message) - if message.starts_with("missing label values ") - || message.starts_with("missing labels ") - ) -} - impl ActorMetricLabels { - fn as_label_values(&self) -> [&str; 3] { - [ - self.actor_id_gen.as_str(), - self.actor_key.as_str(), - self.envoy_key.as_str(), - ] + fn as_label_values(&self) -> [&str; 1] { + [self.actor_name.as_str()] } } diff --git a/rivetkit-rust/packages/rivetkit-core/src/lib.rs b/rivetkit-rust/packages/rivetkit-core/src/lib.rs index 1adbcbb46e..ce1c24c972 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/lib.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/lib.rs @@ -11,7 +11,7 @@ pub mod actor; pub mod engine_process; pub mod error; pub mod inspector; -pub(crate) mod metrics_endpoint; +pub mod metrics_endpoint; pub mod registry; pub mod runtime; pub mod serverless; diff --git a/rivetkit-rust/packages/rivetkit-core/src/metrics_endpoint.rs b/rivetkit-rust/packages/rivetkit-core/src/metrics_endpoint.rs index dac3386e5c..2ed79fb928 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/metrics_endpoint.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/metrics_endpoint.rs @@ -7,17 +7,17 @@ use subtle::ConstantTimeEq; const METRICS_ENABLED_ENV: &str = "RIVETKIT_METRICS_ENABLED"; const METRICS_TOKEN_ENV: &str = "RIVETKIT_METRICS_TOKEN"; -pub(crate) struct RenderedMetrics { - pub(crate) content_type: String, - pub(crate) body: Vec, +pub struct RenderedMetrics { + pub content_type: String, + pub body: Vec, } -pub(crate) enum MetricsAccessError { +pub enum MetricsAccessError { NotEnabled, Unauthorized, } -pub(crate) fn authorize_metrics_request( +pub fn authorize_metrics_request( bearer_token: Option<&str>, ) -> std::result::Result<(), MetricsAccessError> { let Some(configured_token) = configured_metrics_token() else { @@ -35,7 +35,7 @@ pub(crate) fn authorize_metrics_request( } } -pub(crate) fn render_prometheus_metrics() -> Result { +pub fn render_prometheus_metrics() -> Result { let encoder = TextEncoder::new(); let metric_families = rivet_metrics::REGISTRY.gather(); let mut body = Vec::new(); @@ -49,14 +49,14 @@ pub(crate) fn render_prometheus_metrics() -> Result { }) } -pub(crate) fn authorization_bearer_token(headers: &http::HeaderMap) -> Option<&str> { +pub fn authorization_bearer_token(headers: &http::HeaderMap) -> Option<&str> { headers .get(http::header::AUTHORIZATION) .and_then(|value| value.to_str().ok()) .and_then(bearer_token_from_authorization) } -pub(crate) fn authorization_bearer_token_map(headers: &HashMap) -> Option<&str> { +pub fn authorization_bearer_token_map(headers: &HashMap) -> Option<&str> { headers .iter() .find(|(name, _)| name.eq_ignore_ascii_case(http::header::AUTHORIZATION.as_str())) diff --git a/rivetkit-rust/packages/rivetkit-core/src/registry/http.rs b/rivetkit-rust/packages/rivetkit-core/src/registry/http.rs index 838d8b9560..8b8e3222b5 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/registry/http.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/registry/http.rs @@ -477,22 +477,8 @@ fn handle_metrics_fetch(request: &Request) -> Result { return method_not_allowed_response(request, None); } - let bearer_token = crate::metrics_endpoint::authorization_bearer_token(request.headers()); - match crate::metrics_endpoint::authorize_metrics_request(bearer_token) { - Ok(()) => { - let metrics = crate::metrics_endpoint::render_prometheus_metrics()?; - bytes_response(StatusCode::OK, &metrics.content_type, metrics.body) - } - Err(crate::metrics_endpoint::MetricsAccessError::NotEnabled) => { - text_response(StatusCode::FORBIDDEN, "metrics not enabled\n") - } - Err(crate::metrics_endpoint::MetricsAccessError::Unauthorized) => { - text_response( - StatusCode::UNAUTHORIZED, - "metrics request requires a valid bearer token\n", - ) - } - } + let metrics = crate::metrics_endpoint::render_prometheus_metrics()?; + bytes_response(StatusCode::OK, &metrics.content_type, metrics.body) } fn handle_root_fetch(request: &Request, actor: Option<&ActorSpecifier>) -> Result { diff --git a/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs b/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs index 3c98819220..b4ec06bd3b 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs @@ -20,6 +20,9 @@ use rivet_envoy_client::handle::EnvoyHandle; use rivet_envoy_client::protocol; use rivet_error::{ActorSpecifier, RivetError}; use rivetkit_client_protocol as client_protocol; +use rivetkit_shared_types::serverless_metadata::{ + ActorName, ServerlessMetadataEnvoy, ServerlessMetadataEnvoyKind, ServerlessMetadataPayload, +}; use scc::{HashMap as SccHashMap, hash_map::Entry as SccEntry}; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; @@ -81,6 +84,30 @@ pub struct CoreRegistry { factories: HashMap>, } +#[derive(Clone)] +pub struct CoreEnvoyHandle { + handle: EnvoyHandle, +} + +#[derive(Clone, Debug)] +pub struct CoreEnvoyStatus { + pub active_actor_count: usize, + pub ping_healthy: bool, +} + +impl CoreEnvoyHandle { + pub(crate) fn new(handle: EnvoyHandle) -> Self { + Self { handle } + } + + pub fn status(&self) -> CoreEnvoyStatus { + CoreEnvoyStatus { + active_actor_count: self.handle.active_actor_count(), + ping_healthy: self.handle.is_ping_healthy(), + } + } +} + #[derive(Clone)] struct ActorTaskHandle { actor_id: String, @@ -419,6 +446,22 @@ impl CoreRegistry { self.factories.insert(name.to_owned(), factory); } + pub fn normal_metadata_payload(&self, config: &ServeConfig) -> ServerlessMetadataPayload { + serverless_metadata_payload( + build_actor_metadata_map_from_factories(&self.factories), + config, + ServerlessMetadataEnvoyKind::Normal {}, + ) + } + + pub fn serverless_metadata_payload(&self, config: &ServeConfig) -> ServerlessMetadataPayload { + serverless_metadata_payload( + build_actor_metadata_map_from_factories(&self.factories), + config, + ServerlessMetadataEnvoyKind::Serverless {}, + ) + } + pub async fn serve(self, shutdown: CancellationToken) -> Result<()> { self.serve_with_config(ServeConfig::from_env(), shutdown) .await @@ -428,6 +471,16 @@ impl CoreRegistry { self, config: ServeConfig, shutdown: CancellationToken, + ) -> Result<()> { + self.serve_with_config_and_handle_observer(config, shutdown, |_| {}) + .await + } + + pub async fn serve_with_config_and_handle_observer( + self, + config: ServeConfig, + shutdown: CancellationToken, + on_handle: impl FnOnce(CoreEnvoyHandle) + Send + 'static, ) -> Result<()> { let dispatcher = self.into_dispatcher(&config); #[cfg(feature = "native-runtime")] @@ -468,6 +521,7 @@ impl CoreRegistry { callbacks, }) .await; + on_handle(CoreEnvoyHandle::new(handle.clone())); // Do not install `tokio::signal::ctrl_c()` here. It calls // `sigaction(SIGINT, ...)` at the POSIX level, which overrides the @@ -522,51 +576,90 @@ impl RegistryDispatcher { } pub(crate) fn build_actor_metadata_map(&self) -> HashMap { - self.factories - .iter() - .map(|(actor_name, factory)| { - let config = factory.config(); - let mut metadata = serde_json::Map::new(); - if let Some(icon) = &config.icon { - metadata.insert("icon".to_owned(), json!(icon)); - } - if let Some(name) = &config.name { - metadata.insert("name".to_owned(), json!(name)); - } - metadata.insert( - "preload".to_owned(), - json!({ - "keys": [ - [1], - [3], - [5, 1, 1], - [6], - ], - "prefixes": [ - { - "prefix": [6, 1], - "maxBytes": config.preload_max_workflow_bytes.unwrap_or(131_072), - "partial": false, - }, - { - "prefix": [2], - "maxBytes": config.preload_max_connections_bytes.unwrap_or(65_536), - "partial": false, - }, - { - "prefix": [5, 1, 2], - "maxBytes": 65_536, - "partial": false, - }, - ], - }), - ); - (actor_name.clone(), JsonValue::Object(metadata)) - }) - .collect() + build_actor_metadata_map_from_factories(&self.factories) + } +} + +pub(crate) fn serverless_metadata_payload( + actor_metadata: HashMap, + config: &ServeConfig, + envoy_kind: ServerlessMetadataEnvoyKind, +) -> ServerlessMetadataPayload { + let actor_names = actor_metadata + .into_iter() + .map(|(name, metadata)| { + ( + name, + ActorName { + metadata: Some(metadata), + }, + ) + }) + .collect::>(); + + ServerlessMetadataPayload { + runtime: "rivetkit".to_owned(), + version: config.serverless_package_version.clone(), + envoy_protocol_version: Some(protocol::PROTOCOL_VERSION), + actor_names, + envoy: Some(ServerlessMetadataEnvoy { + kind: Some(envoy_kind), + version: Some(config.version), + }), + runner: None, + client_endpoint: config.serverless_client_endpoint.clone(), + client_namespace: config.serverless_client_namespace.clone(), + client_token: config.serverless_client_token.clone(), } } +fn build_actor_metadata_map_from_factories( + factories: &HashMap>, +) -> HashMap { + factories + .iter() + .map(|(actor_name, factory)| { + let config = factory.config(); + let mut metadata = serde_json::Map::new(); + if let Some(icon) = &config.icon { + metadata.insert("icon".to_owned(), json!(icon)); + } + if let Some(name) = &config.name { + metadata.insert("name".to_owned(), json!(name)); + } + metadata.insert( + "preload".to_owned(), + json!({ + "keys": [ + [1], + [3], + [5, 1, 1], + [6], + ], + "prefixes": [ + { + "prefix": [6, 1], + "maxBytes": config.preload_max_workflow_bytes.unwrap_or(131_072), + "partial": false, + }, + { + "prefix": [2], + "maxBytes": config.preload_max_connections_bytes.unwrap_or(65_536), + "partial": false, + }, + { + "prefix": [5, 1, 2], + "maxBytes": 65_536, + "partial": false, + }, + ], + }), + ); + (actor_name.clone(), JsonValue::Object(metadata)) + }) + .collect() +} + impl RegistryDispatcher { async fn start_actor(self: &Arc, request: StartActorRequest) -> Result<()> { let startup_notify = Arc::new(Notify::new()); diff --git a/rivetkit-rust/packages/rivetkit-core/src/serverless.rs b/rivetkit-rust/packages/rivetkit-core/src/serverless.rs index 651fa62cd6..9e22dafceb 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/serverless.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/serverless.rs @@ -21,6 +21,7 @@ use url::Url; use crate::actor::factory::ActorFactory; #[cfg(feature = "native-runtime")] use crate::engine_process::EngineProcessManager; +use crate::registry::{CoreEnvoyHandle, CoreEnvoyStatus}; use crate::registry::{RegistryCallbacks, RegistryDispatcher, ServeConfig}; use crate::runtime::RuntimeSpawner; use crate::time::{sleep, timeout}; @@ -217,11 +218,17 @@ impl CoreServerlessRuntime { } pub async fn active_envoy_actor_count(&self) -> Option { + self.active_envoy_status() + .await + .map(|status| status.active_actor_count) + } + + pub async fn active_envoy_status(&self) -> Option { self.envoy .lock() .await .as_ref() - .map(EnvoyHandle::active_actor_count) + .map(|handle| CoreEnvoyHandle::new(handle.clone()).status()) } pub async fn handle_request(&self, req: ServerlessRequest) -> ServerlessResponse { diff --git a/rivetkit-rust/packages/rivetkit-core/tests/context.rs b/rivetkit-rust/packages/rivetkit-core/tests/context.rs index 1cccea3680..097b58e67b 100644 --- a/rivetkit-rust/packages/rivetkit-core/tests/context.rs +++ b/rivetkit-rust/packages/rivetkit-core/tests/context.rs @@ -327,6 +327,7 @@ mod moved_tests { )), protocol_metadata: Arc::new(tokio::sync::Mutex::new(None)), shutting_down: std::sync::atomic::AtomicBool::new(false), + last_ping_ts: std::sync::atomic::AtomicI64::new(i64::MAX), stopped_tx: tokio::sync::watch::channel(true).0, }); shared @@ -373,6 +374,7 @@ mod moved_tests { )), protocol_metadata: Arc::new(tokio::sync::Mutex::new(None)), shutting_down: std::sync::atomic::AtomicBool::new(false), + last_ping_ts: std::sync::atomic::AtomicI64::new(i64::MAX), stopped_tx: tokio::sync::watch::channel(true).0, }); EnvoyHandle::from_shared(shared) diff --git a/rivetkit-rust/packages/rivetkit-core/tests/metrics.rs b/rivetkit-rust/packages/rivetkit-core/tests/metrics.rs index 6a636edf35..8346464f47 100644 --- a/rivetkit-rust/packages/rivetkit-core/tests/metrics.rs +++ b/rivetkit-rust/packages/rivetkit-core/tests/metrics.rs @@ -5,6 +5,7 @@ mod metrics_helpers; mod moved_tests { use std::panic::{AssertUnwindSafe, catch_unwind}; + use std::time::Duration; use rivet_metrics::prometheus::{IntGauge, Opts, Registry}; @@ -42,44 +43,38 @@ mod moved_tests { } #[test] - fn missing_label_filter_keeps_unexpected_prometheus_errors() { - use rivet_metrics::prometheus::Error; + fn actor_startup_duration_metrics_render() { + let actor_name = "counter-startup"; + let metrics = ActorMetrics::new(actor_name); - assert!(is_missing_labels_error(&Error::Msg( - "missing label values [\"actor\"]".to_owned(), - ))); - assert!(!is_missing_labels_error(&Error::InconsistentCardinality { - expect: 3, - got: 2, - })); - assert!(!is_missing_labels_error(&Error::Msg( - "unexpected metric error".to_owned(), - ))); - } - - #[test] - fn actor_inbox_depth_metrics_render() { - let metrics = ActorMetrics::new("actor-inbox-depth", Some(42), "counter/main", "envoy-1"); - - metrics.set_lifecycle_inbox_depth(1); - metrics.set_dispatch_inbox_depth(2); - metrics.set_lifecycle_event_inbox_depth(3); + metrics.observe_create_state(Duration::from_millis(10)); + metrics.observe_create_vars(Duration::from_millis(20)); let rendered = render_global_metrics(); - assert_metric_value(&rendered, "rivet_actor_lifecycle_inbox_depth", "1"); - assert_metric_value(&rendered, "rivet_actor_dispatch_inbox_depth", "2"); - assert_metric_value(&rendered, "rivet_actor_lifecycle_event_inbox_depth", "3"); + assert_metric_value( + &rendered, + "rivetkit_actor_create_state_duration_seconds_count", + actor_name, + "1", + ); + assert_metric_value( + &rendered, + "rivetkit_actor_create_vars_duration_seconds_count", + actor_name, + "1", + ); } #[test] - fn actor_active_metric_is_retained_after_drop() { - let metrics = ActorMetrics::new("actor-retention", Some(7), "counter/main", "envoy-1"); + fn actor_active_count_tracks_metric_lifetime() { + let actor_name = "counter-active"; + let metrics = ActorMetrics::new(actor_name); let rendered = render_global_metrics(); let line = rendered .lines() - .find(|line| metric_line_for_actor(line, "rivet_actor_active", "actor-retention:7")) - .expect("active actor metric should render"); + .find(|line| metric_line_for_actor(line, "rivetkit_actor_active_count", actor_name)) + .expect("active actor count metric should render"); assert!(line.ends_with('1'), "actor should be active: {line}"); drop(metrics); @@ -87,19 +82,17 @@ mod moved_tests { let rendered = render_global_metrics(); let line = rendered .lines() - .find(|line| metric_line_for_actor(line, "rivet_actor_active", "actor-retention:7")) - .expect("inactive actor metric should remain during retention window"); + .find(|line| metric_line_for_actor(line, "rivetkit_actor_active_count", actor_name)) + .expect("inactive actor count metric should remain"); assert!(line.ends_with('0'), "actor should be inactive: {line}"); } - fn assert_metric_value(metrics: &str, name: &str, value: &str) { + fn assert_metric_value(metrics: &str, name: &str, actor_name: &str, value: &str) { let line = metrics .lines() .find(|line| { line.starts_with(name) - && line.contains("actor_id_gen=\"actor-inbox-depth:42\"") - && line.contains("actor_key=\"counter/main\"") - && line.contains("envoy_key=\"envoy-1\"") + && line.contains(&format!("actor_name=\"{actor_name}\"")) }) .unwrap_or_else(|| panic!("{name} should render")); assert!(line.ends_with(value), "{name} should have value {value}: {line}"); diff --git a/rivetkit-rust/packages/rivetkit-core/tests/metrics_helpers.rs b/rivetkit-rust/packages/rivetkit-core/tests/metrics_helpers.rs index ceb6c5c8dc..1bff6771d9 100644 --- a/rivetkit-rust/packages/rivetkit-core/tests/metrics_helpers.rs +++ b/rivetkit-rust/packages/rivetkit-core/tests/metrics_helpers.rs @@ -10,6 +10,6 @@ pub(crate) fn render_global_metrics() -> String { String::from_utf8(encoded).expect("metrics should be utf-8") } -pub(crate) fn metric_line_for_actor(line: &str, name: &str, actor_id_gen: &str) -> bool { - line.starts_with(name) && line.contains(&format!("actor_id_gen=\"{actor_id_gen}\"")) +pub(crate) fn metric_line_for_actor(line: &str, name: &str, actor_name: &str) -> bool { + line.starts_with(name) && line.contains(&format!("actor_name=\"{actor_name}\"")) } diff --git a/rivetkit-rust/packages/rivetkit-core/tests/modules/connection.rs b/rivetkit-rust/packages/rivetkit-core/tests/modules/connection.rs index c3c77ed4fc..01bb70c963 100644 --- a/rivetkit-rust/packages/rivetkit-core/tests/modules/connection.rs +++ b/rivetkit-rust/packages/rivetkit-core/tests/modules/connection.rs @@ -266,16 +266,11 @@ mod moved_tests { conn.disconnect(None).await?; let metrics = render_global_metrics(); - let active_line = metrics - .lines() - .find(|line| metric_line_for_actor(line, "rivet_actor_active_connections", "conn-metrics-actor:")) - .expect("active connections metric line"); let total_line = metrics .lines() - .find(|line| metric_line_for_actor(line, "rivet_actor_connections_total", "conn-metrics-actor:")) + .find(|line| metric_line_for_actor(line, "rivetkit_actor_connections_total", "conn-metrics")) .expect("connections total metric line"); - assert!(active_line.ends_with(" 0")); assert!(total_line.ends_with(" 1")); Ok(()) } diff --git a/rivetkit-rust/packages/rivetkit-core/tests/modules/queue.rs b/rivetkit-rust/packages/rivetkit-core/tests/modules/queue.rs index 55af8f5ef5..85f886d692 100644 --- a/rivetkit-rust/packages/rivetkit-core/tests/modules/queue.rs +++ b/rivetkit-rust/packages/rivetkit-core/tests/modules/queue.rs @@ -150,20 +150,15 @@ mod moved_tests { let metrics = render_global_metrics(); let sent_line = metrics .lines() - .find(|line| metric_line_for_actor(line, "rivet_actor_queue_messages_sent_total", "queue-metrics-actor:")) + .find(|line| metric_line_for_actor(line, "rivetkit_actor_queue_messages_sent_total", "queue-metrics")) .expect("sent metric line"); let received_line = metrics .lines() - .find(|line| metric_line_for_actor(line, "rivet_actor_queue_messages_received_total", "queue-metrics-actor:")) + .find(|line| metric_line_for_actor(line, "rivetkit_actor_queue_messages_received_total", "queue-metrics")) .expect("received metric line"); - let depth_line = metrics - .lines() - .find(|line| metric_line_for_actor(line, "rivet_actor_queue_depth", "queue-metrics-actor:")) - .expect("depth metric line"); assert!(sent_line.ends_with(" 1")); assert!(received_line.ends_with(" 1")); - assert!(depth_line.ends_with(" 0")); } #[tokio::test] diff --git a/rivetkit-rust/packages/rivetkit-core/tests/schedule.rs b/rivetkit-rust/packages/rivetkit-core/tests/schedule.rs index 81cf3c6abb..bb7456578c 100644 --- a/rivetkit-rust/packages/rivetkit-core/tests/schedule.rs +++ b/rivetkit-rust/packages/rivetkit-core/tests/schedule.rs @@ -96,6 +96,7 @@ mod moved_tests { )), protocol_metadata: Arc::new(tokio::sync::Mutex::new(None)), shutting_down: AtomicBool::new(false), + last_ping_ts: std::sync::atomic::AtomicI64::new(i64::MAX), stopped_tx: tokio::sync::watch::channel(true).0, }); diff --git a/rivetkit-rust/packages/rivetkit-core/tests/sqlite.rs b/rivetkit-rust/packages/rivetkit-core/tests/sqlite.rs index 174c327657..4d6d1332d4 100644 --- a/rivetkit-rust/packages/rivetkit-core/tests/sqlite.rs +++ b/rivetkit-rust/packages/rivetkit-core/tests/sqlite.rs @@ -173,6 +173,7 @@ fn test_envoy_handle() -> (EnvoyHandle, mpsc::UnboundedReceiver) ws_tx: Arc::new(AsyncMutex::new(None::>)), protocol_metadata: Arc::new(AsyncMutex::new(None)), shutting_down: AtomicBool::new(false), + last_ping_ts: std::sync::atomic::AtomicI64::new(i64::MAX), stopped_tx: tokio::sync::watch::channel(true).0, }); diff --git a/rivetkit-rust/packages/rivetkit-core/tests/task.rs b/rivetkit-rust/packages/rivetkit-core/tests/task.rs index 6b7e2e0b9f..56fa349052 100644 --- a/rivetkit-rust/packages/rivetkit-core/tests/task.rs +++ b/rivetkit-rust/packages/rivetkit-core/tests/task.rs @@ -254,6 +254,7 @@ mod moved_tests { )), protocol_metadata: Arc::new(tokio::sync::Mutex::new(None)), shutting_down: AtomicBool::new(false), + last_ping_ts: std::sync::atomic::AtomicI64::new(i64::MAX), stopped_tx: tokio::sync::watch::channel(true).0, }); diff --git a/rivetkit-typescript/packages/rivetkit-napi/index.d.ts b/rivetkit-typescript/packages/rivetkit-napi/index.d.ts index 2cba08a1c7..8ad23ac0ae 100644 --- a/rivetkit-typescript/packages/rivetkit-napi/index.d.ts +++ b/rivetkit-typescript/packages/rivetkit-napi/index.d.ts @@ -174,9 +174,10 @@ export interface JsServerlessResponseHead { status: number headers: Record } -export interface JsRegistryDiagnostics { - mode: string - envoyActiveActorCount?: number +export interface JsRegistryRouteResponse { + status: number + headers: Record + body: Buffer } export interface JsServerlessStreamError { group: string @@ -313,9 +314,11 @@ export declare class CoreRegistry { * Idempotent. Safe to call when neither mode has been activated. * Does not block on the `serve()` future; TS awaits that promise * separately to avoid re-entrancy. - */ + */ shutdown(): Promise - diagnostics(): Promise + health(): Promise + metadata(): JsRegistryRouteResponse + metrics(): JsRegistryRouteResponse handleServerlessRequest(req: JsServerlessRequest, onStreamEvent: (...args: any[]) => any, cancelToken: CancellationToken, config: JsServeConfig): Promise } export declare class Schedule { diff --git a/rivetkit-typescript/packages/rivetkit-napi/src/registry.rs b/rivetkit-typescript/packages/rivetkit-napi/src/registry.rs index 5f720d3232..5dbccf6948 100644 --- a/rivetkit-typescript/packages/rivetkit-napi/src/registry.rs +++ b/rivetkit-typescript/packages/rivetkit-napi/src/registry.rs @@ -6,9 +6,10 @@ use napi::JsObject; use napi::bindgen_prelude::{Buffer, Env, Promise}; use napi::threadsafe_function::{ErrorStrategy, ThreadSafeCallContext, ThreadsafeFunction}; use napi_derive::napi; +use parking_lot::Mutex as ParkingMutex; use rivetkit_core::{ CoreRegistry as NativeCoreRegistry, CoreServerlessRuntime, ServeConfig, ServerlessRequest, - serverless::ServerlessStreamError, + registry::CoreEnvoyHandle, serverless::ServerlessStreamError, }; use tokio::sync::{Mutex as TokioMutex, Notify}; use tokio_util::sync::CancellationToken as CoreCancellationToken; @@ -50,9 +51,11 @@ pub struct JsServerlessResponseHead { } #[napi(object)] -pub struct JsRegistryDiagnostics { - pub mode: String, - pub envoy_active_actor_count: Option, +#[derive(Clone)] +pub struct JsRegistryRouteResponse { + pub status: u16, + pub headers: HashMap, + pub body: Buffer, } #[napi(object)] @@ -95,6 +98,9 @@ enum RegistryState { #[derive(Clone)] pub struct CoreRegistry { state: Arc>, + serving_envoy: Arc>>, + route_metadata: Arc>>, + route_package_version: Arc>>, shutdown_token: CoreCancellationToken, /// Notified whenever the state transitions out of `BuildingServerless` /// (to `Serverless(_)` on success, or `ShutDown` on failure/shutdown). @@ -114,6 +120,9 @@ impl CoreRegistry { state: Arc::new(TokioMutex::new(RegistryState::Registering( NativeCoreRegistry::new(), ))), + serving_envoy: Arc::new(ParkingMutex::new(None)), + route_metadata: Arc::new(ParkingMutex::new(None)), + route_package_version: Arc::new(ParkingMutex::new(None)), shutdown_token: CoreCancellationToken::new(), build_complete: Arc::new(Notify::new()), } @@ -165,32 +174,25 @@ impl CoreRegistry { } }; - registry - .serve_with_config( - ServeConfig { - version: config.version, - endpoint: config.endpoint, - token: config.token, - namespace: config.namespace, - pool_name: config.pool_name, - engine_binary_path: config.engine_binary_path.map(PathBuf::from), - handle_inspector_http_in_runtime: config - .handle_inspector_http_in_runtime - .unwrap_or(false), - serverless_base_path: config.serverless_base_path, - serverless_package_version: config.serverless_package_version, - serverless_client_endpoint: config.serverless_client_endpoint, - serverless_client_namespace: config.serverless_client_namespace, - serverless_client_token: config.serverless_client_token, - serverless_validate_endpoint: config.serverless_validate_endpoint, - serverless_max_start_payload_bytes: config.serverless_max_start_payload_bytes - as usize, - serverless_cache_envoy: true, - }, + let serve_config = serve_config_from_js(config, false, true); + *self.route_package_version.lock() = Some(serve_config.serverless_package_version.clone()); + *self.route_metadata.lock() = Some(metadata_response( + registry.normal_metadata_payload(&serve_config), + )?); + + *self.serving_envoy.lock() = None; + let serving_envoy = self.serving_envoy.clone(); + let result = registry + .serve_with_config_and_handle_observer( + serve_config, self.shutdown_token.clone(), + move |handle| { + *serving_envoy.lock() = Some(handle); + }, ) - .await - .map_err(napi_anyhow_error) + .await; + *self.serving_envoy.lock() = None; + result.map_err(napi_anyhow_error) } /// Trip the shutdown token and tear down any live serverless runtime. @@ -242,38 +244,76 @@ impl CoreRegistry { } #[napi] - pub async fn diagnostics(&self) -> napi::Result { - let guard = self.state.lock().await; - let diagnostics = match &*guard { - RegistryState::Registering(_) => JsRegistryDiagnostics { - mode: "registering".to_owned(), - envoy_active_actor_count: None, - }, - RegistryState::BuildingServerless => JsRegistryDiagnostics { - mode: "building_serverless".to_owned(), - envoy_active_actor_count: None, - }, - RegistryState::Serving => JsRegistryDiagnostics { - mode: "serving".to_owned(), - envoy_active_actor_count: None, - }, - RegistryState::Serverless(runtime) => JsRegistryDiagnostics { - mode: "serverless".to_owned(), - envoy_active_actor_count: runtime - .active_envoy_actor_count() - .await - .map(|count| count as u32), - }, - RegistryState::ShuttingDown => JsRegistryDiagnostics { - mode: "shutting_down".to_owned(), - envoy_active_actor_count: None, - }, - RegistryState::ShutDown => JsRegistryDiagnostics { - mode: "shut_down".to_owned(), - envoy_active_actor_count: None, - }, + pub async fn health(&self) -> napi::Result { + let version = self.route_package_version(); + let serverless_runtime = { + let guard = self.state.lock().await; + match &*guard { + RegistryState::Registering(_) => { + return Ok(health_response(503, "not_started", &version)); + } + RegistryState::BuildingServerless => { + return Ok(health_response(503, "starting", &version)); + } + RegistryState::Serving => match self + .serving_envoy + .lock() + .as_ref() + .map(CoreEnvoyHandle::status) + { + Some(envoy) => { + return Ok(health_response( + if envoy.ping_healthy { 200 } else { 503 }, + if envoy.ping_healthy { "ok" } else { "engine_ping_stale" }, + &version, + )); + } + None => return Ok(health_response(503, "starting", &version)), + } + RegistryState::Serverless(runtime) => runtime.clone(), + RegistryState::ShuttingDown => { + return Ok(health_response(503, "shutting_down", &version)); + } + RegistryState::ShutDown => { + return Ok(health_response(503, "shut_down", &version)); + } + } + }; + + let response = match serverless_runtime.active_envoy_status().await { + Some(envoy) => health_response( + if envoy.ping_healthy { 200 } else { 503 }, + if envoy.ping_healthy { "ok" } else { "engine_ping_stale" }, + &version, + ), + None => health_response(200, "ok", &version), }; - Ok(diagnostics) + Ok(response) + } + + #[napi] + pub fn metadata(&self) -> napi::Result { + self.route_metadata.lock().clone().ok_or_else(|| { + napi_anyhow_error( + NapiInvalidState { + state: "metadata_unavailable".to_owned(), + reason: "registry metadata is not available until the registry has started" + .to_owned(), + } + .build(), + ) + }) + } + + #[napi] + pub fn metrics(&self) -> napi::Result { + let metrics = + rivetkit_core::metrics_endpoint::render_prometheus_metrics().map_err(napi_anyhow_error)?; + Ok(JsRegistryRouteResponse { + status: 200, + headers: HashMap::from([("content-type".to_owned(), metrics.content_type)]), + body: Buffer::from(metrics.body), + }) } #[napi(ts_return_type = "Promise")] @@ -401,27 +441,14 @@ impl CoreRegistry { registry: NativeCoreRegistry, config: JsServeConfig, ) -> napi::Result { + let serve_config = serve_config_from_js(config, true, true); + *self.route_package_version.lock() = Some(serve_config.serverless_package_version.clone()); + *self.route_metadata.lock() = Some(metadata_response( + registry.serverless_metadata_payload(&serve_config), + )?); + let build_result = registry - .into_serverless_runtime(ServeConfig { - version: config.version, - endpoint: config.endpoint, - token: config.token, - namespace: config.namespace, - pool_name: config.pool_name, - engine_binary_path: config.engine_binary_path.map(PathBuf::from), - handle_inspector_http_in_runtime: config - .handle_inspector_http_in_runtime - .unwrap_or(true), - serverless_base_path: config.serverless_base_path, - serverless_package_version: config.serverless_package_version, - serverless_client_endpoint: config.serverless_client_endpoint, - serverless_client_namespace: config.serverless_client_namespace, - serverless_client_token: config.serverless_client_token, - serverless_validate_endpoint: config.serverless_validate_endpoint, - serverless_max_start_payload_bytes: config.serverless_max_start_payload_bytes - as usize, - serverless_cache_envoy: true, - }) + .into_serverless_runtime(serve_config) .await; // Re-acquire the lock and re-check state. Shutdown may have run during @@ -465,6 +492,13 @@ impl CoreRegistry { self.build_complete.notify_waiters(); result } + + fn route_package_version(&self) -> String { + self.route_package_version + .lock() + .clone() + .unwrap_or_else(|| env!("CARGO_PKG_VERSION").to_owned()) + } } fn create_stream_event_tsfn( @@ -513,6 +547,58 @@ impl From for JsServerlessStreamError { } } +fn health_response(status_code: u16, status: &str, version: &str) -> JsRegistryRouteResponse { + let body = serde_json::json!({ + "status": status, + "runtime": "rivetkit", + "version": version, + }); + + JsRegistryRouteResponse { + status: status_code, + headers: HashMap::from([( + "content-type".to_owned(), + "application/json".to_owned(), + )]), + body: Buffer::from(serde_json::to_vec(&body).unwrap_or_else(|_| b"{}".to_vec())), + } +} + +fn metadata_response(payload: impl serde::Serialize) -> napi::Result { + let body = serde_json::to_vec(&payload).map_err(|error| napi_anyhow_error(error.into()))?; + Ok(JsRegistryRouteResponse { + status: 200, + headers: HashMap::from([("content-type".to_owned(), "application/json".to_owned())]), + body: Buffer::from(body), + }) +} + +fn serve_config_from_js( + config: JsServeConfig, + default_handle_inspector_http_in_runtime: bool, + serverless_cache_envoy: bool, +) -> ServeConfig { + ServeConfig { + version: config.version, + endpoint: config.endpoint, + token: config.token, + namespace: config.namespace, + pool_name: config.pool_name, + engine_binary_path: config.engine_binary_path.map(PathBuf::from), + handle_inspector_http_in_runtime: config + .handle_inspector_http_in_runtime + .unwrap_or(default_handle_inspector_http_in_runtime), + serverless_base_path: config.serverless_base_path, + serverless_package_version: config.serverless_package_version, + serverless_client_endpoint: config.serverless_client_endpoint, + serverless_client_namespace: config.serverless_client_namespace, + serverless_client_token: config.serverless_client_token, + serverless_validate_endpoint: config.serverless_validate_endpoint, + serverless_max_start_payload_bytes: config.serverless_max_start_payload_bytes as usize, + serverless_cache_envoy, + } +} + fn registry_not_registering_error() -> napi::Error { napi_anyhow_error( NapiInvalidState { diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/index.ts b/rivetkit-typescript/packages/rivetkit/src/registry/index.ts index 96e2e73c7c..483f7d1ac4 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/index.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/index.ts @@ -22,13 +22,15 @@ export interface ServerlessHandler { fetch: FetchHandler; } -export interface RegistryDiagnostics { - mode: string; - envoyActiveActorCount?: number | null; +export interface RegistryRoutes { + health(): Promise; + metadata(): Promise; + prometheusMetrics(request?: Request): Promise; } export class Registry { #config: RegistryConfigInput; + public readonly routes: RegistryRoutes; get config(): RegistryConfigInput { return this.#config; @@ -49,6 +51,12 @@ export class Registry { constructor(config: RegistryConfigInput) { this.#config = config; + this.routes = { + health: () => this.#healthRoute(), + metadata: () => this.#metadataRoute(), + prometheusMetrics: (request?: Request) => + this.#prometheusMetricsRoute(request), + }; } #ensureServerlessPoolConfigured(config: RegistryConfig): Promise | undefined { @@ -279,7 +287,92 @@ export class Registry { }; } - public async diagnostics(): Promise { + /** + * Returns a health response suitable for mounting in a user-owned router. + */ + async #healthRoute(): Promise { + const configured = await this.#activeConfiguredRegistry(); + if (!configured) { + return jsonRouteResponse(503, { + status: "not_started", + runtime: "rivetkit", + version: VERSION, + }); + } + + const { runtime, registry } = configured; + if (!runtime.registryHealth) { + return jsonRouteResponse(501, { + status: "unsupported", + runtime: "rivetkit", + version: VERSION, + }); + } + + const response = await runtime.registryHealth(registry); + return new Response(new Uint8Array(response.body), { + status: response.status, + headers: response.headers, + }); + } + + /** + * Returns serverless metadata suitable for mounting in a user-owned router. + */ + async #metadataRoute(): Promise { + const configured = await this.#activeConfiguredRegistry(); + if (!configured) { + return new Response("registry not started\n", { + status: 503, + headers: { "content-type": "text/plain; charset=utf-8" }, + }); + } + + const { runtime, registry } = configured; + if (!runtime.registryMetadata) { + return new Response("metadata is not supported by this runtime\n", { + status: 501, + headers: { "content-type": "text/plain; charset=utf-8" }, + }); + } + + const response = await runtime.registryMetadata(registry); + return new Response(new Uint8Array(response.body), { + status: response.status, + headers: response.headers, + }); + } + + /** + * Returns a Prometheus metrics response suitable for mounting in a user-owned router. + */ + async #prometheusMetricsRoute(_request?: Request): Promise { + const configured = await this.#activeConfiguredRegistry(); + if (!configured) { + return new Response("registry not started\n", { + status: 503, + headers: { "content-type": "text/plain; charset=utf-8" }, + }); + } + + const { runtime, registry } = configured; + if (!runtime.registryMetrics) { + return new Response("metrics are not supported by this runtime\n", { + status: 501, + headers: { "content-type": "text/plain; charset=utf-8" }, + }); + } + + const response = await runtime.registryMetrics(registry); + return new Response(new Uint8Array(response.body), { + status: response.status, + headers: response.headers, + }); + } + + async #activeConfiguredRegistry(): Promise< + Awaited> | undefined + > { const candidates = [ this.#runtimeServerlessPromise, this.#runtimeServeConfiguredPromise, @@ -287,13 +380,8 @@ export class Registry { candidate !== undefined ); - for (const candidate of candidates) { - const { runtime, registry } = await candidate; - const diagnostics = await runtime.registryDiagnostics?.(registry); - if (diagnostics) return diagnostics; - } - - return { mode: "not_started", envoyActiveActorCount: null }; + if (candidates.length === 0) return undefined; + return await candidates[0]!; } /** @@ -524,6 +612,13 @@ function isServerlessMetadataRequest(request: Request, basePath: string): boolea return parsed.pathname === `${normalizedBase}/metadata`; } +function jsonRouteResponse(status: number, body: unknown): Response { + return new Response(JSON.stringify(body), { + status, + headers: { "content-type": "application/json" }, + }); +} + export function setup( input: RegistryConfigInput, ): Registry { diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/napi-runtime.ts b/rivetkit-typescript/packages/rivetkit/src/registry/napi-runtime.ts index f1b8890a34..29cc744bbf 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/napi-runtime.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/napi-runtime.ts @@ -24,7 +24,7 @@ import type { RuntimeQueueTryNextBatchOptions, RuntimeQueueWaitOptions, RuntimeRequestSaveOpts, - RuntimeRegistryDiagnostics, + RuntimeRegistryRouteResponse, RuntimeServeConfig, RuntimeServerlessRequest, RuntimeServerlessResponseHead, @@ -202,13 +202,36 @@ export class NapiCoreRuntime implements CoreRuntime { await asNativeRegistry(registry).shutdown(); } - async registryDiagnostics( + async registryHealth( registry: RegistryHandle, - ): Promise { - const diagnostics = await asNativeRegistry(registry).diagnostics(); + ): Promise { + const response = await asNativeRegistry(registry).health(); return { - mode: diagnostics.mode, - envoyActiveActorCount: diagnostics.envoyActiveActorCount, + status: response.status, + headers: response.headers, + body: response.body, + }; + } + + async registryMetadata( + registry: RegistryHandle, + ): Promise { + const response = asNativeRegistry(registry).metadata(); + return { + status: response.status, + headers: response.headers, + body: response.body, + }; + } + + async registryMetrics( + registry: RegistryHandle, + ): Promise { + const response = asNativeRegistry(registry).metrics(); + return { + status: response.status, + headers: response.headers, + body: response.body, }; } diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/runtime.ts b/rivetkit-typescript/packages/rivetkit/src/registry/runtime.ts index 0e4875e966..378dbae1ff 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/runtime.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/runtime.ts @@ -257,9 +257,10 @@ export interface RuntimeServerlessResponseHead { headers: Record; } -export interface RuntimeRegistryDiagnostics { - mode: string; - envoyActiveActorCount?: number | null; +export interface RuntimeRegistryRouteResponse { + status: number; + headers: Record; + body: RuntimeBytes; } export type RuntimeServerlessStreamEvent = @@ -316,9 +317,15 @@ export interface CoreRuntime { cancelToken: CancellationTokenHandle, config: RuntimeServeConfig, ): Promise; - registryDiagnostics?( + registryHealth?( + registry: RegistryHandle, + ): Promise; + registryMetadata?( + registry: RegistryHandle, + ): Promise; + registryMetrics?( registry: RegistryHandle, - ): Promise; + ): Promise; createActorFactory( callbacks: object, config?: RuntimeActorConfig | undefined | null, diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/wasm-runtime.ts b/rivetkit-typescript/packages/rivetkit/src/registry/wasm-runtime.ts index dbb9028dbf..07fdfdd416 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/wasm-runtime.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/wasm-runtime.ts @@ -33,6 +33,7 @@ import type { RuntimeQueueTryNextBatchOptions, RuntimeQueueWaitOptions, RuntimeRequestSaveOpts, + RuntimeRegistryRouteResponse, RuntimeServeConfig, RuntimeServerlessRequest, RuntimeServerlessResponseHead, @@ -270,8 +271,18 @@ export class WasmCoreRuntime implements CoreRuntime { await callWasm(() => asWasmRegistry(registry).shutdown()); } - async registryDiagnostics(): Promise<{ mode: string; envoyActiveActorCount: null }> { - return { mode: "wasm", envoyActiveActorCount: null }; + async registryHealth(): Promise { + return { + status: 200, + headers: { "content-type": "application/json" }, + body: new TextEncoder().encode( + JSON.stringify({ + status: "ok", + runtime: "rivetkit", + version: "wasm", + }), + ), + }; } async handleServerlessRequest(