diff --git a/rivetkit-rust/packages/rivetkit-core/src/lib.rs b/rivetkit-rust/packages/rivetkit-core/src/lib.rs index 76e4bce759..1adbcbb46e 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/lib.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/lib.rs @@ -11,6 +11,7 @@ pub mod actor; pub mod engine_process; pub mod error; pub mod inspector; +pub(crate) mod metrics_endpoint; pub mod registry; pub mod runtime; pub mod serverless; diff --git a/rivetkit-rust/packages/rivetkit-core/src/metrics_endpoint.rs b/rivetkit-rust/packages/rivetkit-core/src/metrics_endpoint.rs new file mode 100644 index 0000000000..dac3386e5c --- /dev/null +++ b/rivetkit-rust/packages/rivetkit-core/src/metrics_endpoint.rs @@ -0,0 +1,91 @@ +use std::collections::HashMap; + +use anyhow::{Context, Result}; +use rivet_metrics::prometheus::{Encoder, TextEncoder}; +use subtle::ConstantTimeEq; + +const METRICS_ENABLED_ENV: &str = "RIVETKIT_METRICS_ENABLED"; +const METRICS_TOKEN_ENV: &str = "RIVETKIT_METRICS_TOKEN"; + +pub(crate) struct RenderedMetrics { + pub(crate) content_type: String, + pub(crate) body: Vec, +} + +pub(crate) enum MetricsAccessError { + NotEnabled, + Unauthorized, +} + +pub(crate) fn authorize_metrics_request( + bearer_token: Option<&str>, +) -> std::result::Result<(), MetricsAccessError> { + let Some(configured_token) = configured_metrics_token() else { + return Err(MetricsAccessError::NotEnabled); + }; + + let Some(bearer_token) = bearer_token.filter(|token| !token.is_empty()) else { + return Err(MetricsAccessError::Unauthorized); + }; + + if bearer_token.as_bytes().ct_eq(configured_token.as_bytes()).into() { + Ok(()) + } else { + Err(MetricsAccessError::Unauthorized) + } +} + +pub(crate) fn render_prometheus_metrics() -> Result { + let encoder = TextEncoder::new(); + let metric_families = rivet_metrics::REGISTRY.gather(); + let mut body = Vec::new(); + encoder + .encode(&metric_families, &mut body) + .context("encode prometheus metrics")?; + + Ok(RenderedMetrics { + content_type: encoder.format_type().to_owned(), + body, + }) +} + +pub(crate) fn authorization_bearer_token(headers: &http::HeaderMap) -> Option<&str> { + headers + .get(http::header::AUTHORIZATION) + .and_then(|value| value.to_str().ok()) + .and_then(bearer_token_from_authorization) +} + +pub(crate) fn authorization_bearer_token_map(headers: &HashMap) -> Option<&str> { + headers + .iter() + .find(|(name, _)| name.eq_ignore_ascii_case(http::header::AUTHORIZATION.as_str())) + .and_then(|(_, value)| bearer_token_from_authorization(value)) +} + +fn configured_metrics_token() -> Option { + let enabled = std::env::var(METRICS_ENABLED_ENV).ok()?; + if enabled != "1" { + return None; + } + + std::env::var(METRICS_TOKEN_ENV) + .ok() + .filter(|token| !token.is_empty()) +} + +fn bearer_token_from_authorization(value: &str) -> Option<&str> { + let value = value.trim_start(); + let scheme = value.get(..6)?; + if !scheme.eq_ignore_ascii_case("bearer") { + return None; + } + + let rest = value.get(6..)?; + if !rest.chars().next().is_some_and(char::is_whitespace) { + return None; + } + + let token = rest.trim_start(); + if token.is_empty() { None } else { Some(token) } +} diff --git a/rivetkit-rust/packages/rivetkit-core/src/registry/http.rs b/rivetkit-rust/packages/rivetkit-core/src/registry/http.rs index 3527621953..838d8b9560 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/registry/http.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/registry/http.rs @@ -21,6 +21,9 @@ impl RegistryDispatcher { request.uri().path(), self.handle_inspector_http_in_runtime, )?; + if matches!(route, RegistryHttpRoute::Framework(FrameworkHttpRoute::Metrics)) { + return handle_metrics_fetch(&request); + } let instance = match self.active_actor(actor_id).await { Ok(instance) => instance, Err(error) => { @@ -114,6 +117,7 @@ impl RegistryDispatcher { } FrameworkHttpRoute::Metadata => handle_metadata_fetch(&request, Some(&actor)), FrameworkHttpRoute::Health => handle_health_fetch(&request, Some(&actor)), + FrameworkHttpRoute::Metrics => handle_metrics_fetch(&request), FrameworkHttpRoute::Root => handle_root_fetch(&request, Some(&actor)), FrameworkHttpRoute::NotFound => handle_not_found_fetch(&request, Some(&actor)), } @@ -416,6 +420,7 @@ impl RegistryHttpRoute { match normalized_path { "/metadata" => Ok(Self::Framework(FrameworkHttpRoute::Metadata)), "/health" => Ok(Self::Framework(FrameworkHttpRoute::Health)), + "/metrics" => Ok(Self::Framework(FrameworkHttpRoute::Metrics)), "/" => Ok(Self::Framework(FrameworkHttpRoute::Root)), _ => Ok(Self::Framework(FrameworkHttpRoute::NotFound)), } @@ -427,6 +432,7 @@ pub(super) enum FrameworkHttpRoute { Queue(String), Metadata, Health, + Metrics, Root, NotFound, } @@ -466,6 +472,29 @@ fn handle_health_fetch(request: &Request, actor: Option<&ActorSpecifier>) -> Res text_response(StatusCode::OK, "ok") } +fn handle_metrics_fetch(request: &Request) -> Result { + if request.method() != http::Method::GET { + return method_not_allowed_response(request, None); + } + + let bearer_token = crate::metrics_endpoint::authorization_bearer_token(request.headers()); + match crate::metrics_endpoint::authorize_metrics_request(bearer_token) { + Ok(()) => { + let metrics = crate::metrics_endpoint::render_prometheus_metrics()?; + bytes_response(StatusCode::OK, &metrics.content_type, metrics.body) + } + Err(crate::metrics_endpoint::MetricsAccessError::NotEnabled) => { + text_response(StatusCode::FORBIDDEN, "metrics not enabled\n") + } + Err(crate::metrics_endpoint::MetricsAccessError::Unauthorized) => { + text_response( + StatusCode::UNAUTHORIZED, + "metrics request requires a valid bearer token\n", + ) + } + } +} + fn handle_root_fetch(request: &Request, actor: Option<&ActorSpecifier>) -> Result { if request.method() != http::Method::GET { return method_not_allowed_response(request, actor); @@ -494,15 +523,23 @@ fn handle_not_found_fetch( } fn text_response(status: StatusCode, body: &str) -> Result { + bytes_response( + status, + "text/plain; charset=utf-8", + body.as_bytes().to_vec(), + ) +} + +fn bytes_response(status: StatusCode, content_type: &str, body: Vec) -> Result { let mut headers = HashMap::new(); headers.insert( http::header::CONTENT_TYPE.to_string(), - "text/plain; charset=utf-8".to_owned(), + content_type.to_owned(), ); Ok(HttpResponse { status: status.as_u16(), headers, - body: Some(body.as_bytes().to_vec()), + body: Some(body), body_stream: None, }) } diff --git a/rivetkit-rust/packages/rivetkit-core/src/serverless.rs b/rivetkit-rust/packages/rivetkit-core/src/serverless.rs index 284fc6c166..651fa62cd6 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/serverless.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/serverless.rs @@ -279,6 +279,7 @@ impl CoreServerlessRuntime { } } ("GET", "/metadata") => Ok(self.metadata_response()), + ("GET", "/metrics") => Ok(metrics_response(&req.headers)), ("GET", "/start") | ("POST", "/start") => self.start_response(req).await, ("OPTIONS", _) => Ok(bytes_response( StatusCode::NO_CONTENT, @@ -625,6 +626,30 @@ fn json_response(status: StatusCode, body: serde_json::Value) -> ServerlessRespo ) } +fn metrics_response(headers: &HashMap) -> ServerlessResponse { + let bearer_token = crate::metrics_endpoint::authorization_bearer_token_map(headers); + match crate::metrics_endpoint::authorize_metrics_request(bearer_token) { + Ok(()) => match crate::metrics_endpoint::render_prometheus_metrics() { + Ok(metrics) => bytes_response( + StatusCode::OK, + HashMap::from([("content-type".to_owned(), metrics.content_type)]), + metrics.body, + ), + Err(error) => error_response(error), + }, + Err(crate::metrics_endpoint::MetricsAccessError::NotEnabled) => text_response( + StatusCode::FORBIDDEN, + "text/plain; charset=utf-8", + "metrics not enabled\n", + ), + Err(crate::metrics_endpoint::MetricsAccessError::Unauthorized) => text_response( + StatusCode::UNAUTHORIZED, + "text/plain; charset=utf-8", + "metrics request requires a valid bearer token\n", + ), + } +} + fn bytes_response( status: StatusCode, headers: HashMap,