Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rivetkit-rust/packages/rivetkit-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub mod actor;
pub mod engine_process;
pub mod error;
pub mod inspector;
pub(crate) mod metrics_endpoint;
pub mod registry;
pub mod runtime;
pub mod serverless;
Expand Down
91 changes: 91 additions & 0 deletions rivetkit-rust/packages/rivetkit-core/src/metrics_endpoint.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
use std::collections::HashMap;

use anyhow::{Context, Result};
use rivet_metrics::prometheus::{Encoder, TextEncoder};
use subtle::ConstantTimeEq;

const METRICS_ENABLED_ENV: &str = "RIVETKIT_METRICS_ENABLED";
const METRICS_TOKEN_ENV: &str = "RIVETKIT_METRICS_TOKEN";

pub(crate) struct RenderedMetrics {
pub(crate) content_type: String,
pub(crate) body: Vec<u8>,
}

pub(crate) enum MetricsAccessError {
NotEnabled,
Unauthorized,
}

pub(crate) fn authorize_metrics_request(
bearer_token: Option<&str>,
) -> std::result::Result<(), MetricsAccessError> {
let Some(configured_token) = configured_metrics_token() else {
return Err(MetricsAccessError::NotEnabled);
};

let Some(bearer_token) = bearer_token.filter(|token| !token.is_empty()) else {
return Err(MetricsAccessError::Unauthorized);
};

if bearer_token.as_bytes().ct_eq(configured_token.as_bytes()).into() {
Ok(())
} else {
Err(MetricsAccessError::Unauthorized)
}
}

pub(crate) fn render_prometheus_metrics() -> Result<RenderedMetrics> {
let encoder = TextEncoder::new();
let metric_families = rivet_metrics::REGISTRY.gather();
let mut body = Vec::new();
encoder
.encode(&metric_families, &mut body)
.context("encode prometheus metrics")?;

Ok(RenderedMetrics {
content_type: encoder.format_type().to_owned(),
body,
})
}

pub(crate) fn authorization_bearer_token(headers: &http::HeaderMap) -> Option<&str> {
headers
.get(http::header::AUTHORIZATION)
.and_then(|value| value.to_str().ok())
.and_then(bearer_token_from_authorization)
}

pub(crate) fn authorization_bearer_token_map(headers: &HashMap<String, String>) -> Option<&str> {
headers
.iter()
.find(|(name, _)| name.eq_ignore_ascii_case(http::header::AUTHORIZATION.as_str()))
.and_then(|(_, value)| bearer_token_from_authorization(value))
}

fn configured_metrics_token() -> Option<String> {
let enabled = std::env::var(METRICS_ENABLED_ENV).ok()?;
if enabled != "1" {
return None;
}

std::env::var(METRICS_TOKEN_ENV)
.ok()
.filter(|token| !token.is_empty())
}

fn bearer_token_from_authorization(value: &str) -> Option<&str> {
let value = value.trim_start();
let scheme = value.get(..6)?;
if !scheme.eq_ignore_ascii_case("bearer") {
return None;
}

let rest = value.get(6..)?;
if !rest.chars().next().is_some_and(char::is_whitespace) {
return None;
}

let token = rest.trim_start();
if token.is_empty() { None } else { Some(token) }
}
41 changes: 39 additions & 2 deletions rivetkit-rust/packages/rivetkit-core/src/registry/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ impl RegistryDispatcher {
request.uri().path(),
self.handle_inspector_http_in_runtime,
)?;
if matches!(route, RegistryHttpRoute::Framework(FrameworkHttpRoute::Metrics)) {
return handle_metrics_fetch(&request);
}
let instance = match self.active_actor(actor_id).await {
Ok(instance) => instance,
Err(error) => {
Expand Down Expand Up @@ -114,6 +117,7 @@ impl RegistryDispatcher {
}
FrameworkHttpRoute::Metadata => handle_metadata_fetch(&request, Some(&actor)),
FrameworkHttpRoute::Health => handle_health_fetch(&request, Some(&actor)),
FrameworkHttpRoute::Metrics => handle_metrics_fetch(&request),
FrameworkHttpRoute::Root => handle_root_fetch(&request, Some(&actor)),
FrameworkHttpRoute::NotFound => handle_not_found_fetch(&request, Some(&actor)),
}
Expand Down Expand Up @@ -416,6 +420,7 @@ impl RegistryHttpRoute {
match normalized_path {
"/metadata" => Ok(Self::Framework(FrameworkHttpRoute::Metadata)),
"/health" => Ok(Self::Framework(FrameworkHttpRoute::Health)),
"/metrics" => Ok(Self::Framework(FrameworkHttpRoute::Metrics)),
"/" => Ok(Self::Framework(FrameworkHttpRoute::Root)),
_ => Ok(Self::Framework(FrameworkHttpRoute::NotFound)),
}
Expand All @@ -427,6 +432,7 @@ pub(super) enum FrameworkHttpRoute {
Queue(String),
Metadata,
Health,
Metrics,
Root,
NotFound,
}
Expand Down Expand Up @@ -466,6 +472,29 @@ fn handle_health_fetch(request: &Request, actor: Option<&ActorSpecifier>) -> Res
text_response(StatusCode::OK, "ok")
}

fn handle_metrics_fetch(request: &Request) -> Result<HttpResponse> {
if request.method() != http::Method::GET {
return method_not_allowed_response(request, None);
}

let bearer_token = crate::metrics_endpoint::authorization_bearer_token(request.headers());
match crate::metrics_endpoint::authorize_metrics_request(bearer_token) {
Ok(()) => {
let metrics = crate::metrics_endpoint::render_prometheus_metrics()?;
bytes_response(StatusCode::OK, &metrics.content_type, metrics.body)
}
Err(crate::metrics_endpoint::MetricsAccessError::NotEnabled) => {
text_response(StatusCode::FORBIDDEN, "metrics not enabled\n")
}
Err(crate::metrics_endpoint::MetricsAccessError::Unauthorized) => {
text_response(
StatusCode::UNAUTHORIZED,
"metrics request requires a valid bearer token\n",
)
}
}
}

fn handle_root_fetch(request: &Request, actor: Option<&ActorSpecifier>) -> Result<HttpResponse> {
if request.method() != http::Method::GET {
return method_not_allowed_response(request, actor);
Expand Down Expand Up @@ -494,15 +523,23 @@ fn handle_not_found_fetch(
}

fn text_response(status: StatusCode, body: &str) -> Result<HttpResponse> {
bytes_response(
status,
"text/plain; charset=utf-8",
body.as_bytes().to_vec(),
)
}

fn bytes_response(status: StatusCode, content_type: &str, body: Vec<u8>) -> Result<HttpResponse> {
let mut headers = HashMap::new();
headers.insert(
http::header::CONTENT_TYPE.to_string(),
"text/plain; charset=utf-8".to_owned(),
content_type.to_owned(),
);
Ok(HttpResponse {
status: status.as_u16(),
headers,
body: Some(body.as_bytes().to_vec()),
body: Some(body),
body_stream: None,
})
}
Expand Down
25 changes: 25 additions & 0 deletions rivetkit-rust/packages/rivetkit-core/src/serverless.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ impl CoreServerlessRuntime {
}
}
("GET", "/metadata") => Ok(self.metadata_response()),
("GET", "/metrics") => Ok(metrics_response(&req.headers)),
("GET", "/start") | ("POST", "/start") => self.start_response(req).await,
("OPTIONS", _) => Ok(bytes_response(
StatusCode::NO_CONTENT,
Expand Down Expand Up @@ -625,6 +626,30 @@ fn json_response(status: StatusCode, body: serde_json::Value) -> ServerlessRespo
)
}

fn metrics_response(headers: &HashMap<String, String>) -> ServerlessResponse {
let bearer_token = crate::metrics_endpoint::authorization_bearer_token_map(headers);
match crate::metrics_endpoint::authorize_metrics_request(bearer_token) {
Ok(()) => match crate::metrics_endpoint::render_prometheus_metrics() {
Ok(metrics) => bytes_response(
StatusCode::OK,
HashMap::from([("content-type".to_owned(), metrics.content_type)]),
metrics.body,
),
Err(error) => error_response(error),
},
Err(crate::metrics_endpoint::MetricsAccessError::NotEnabled) => text_response(
StatusCode::FORBIDDEN,
"text/plain; charset=utf-8",
"metrics not enabled\n",
),
Err(crate::metrics_endpoint::MetricsAccessError::Unauthorized) => text_response(
StatusCode::UNAUTHORIZED,
"text/plain; charset=utf-8",
"metrics request requires a valid bearer token\n",
),
}
}

fn bytes_response(
status: StatusCode,
headers: HashMap<String, String>,
Expand Down
Loading