Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,5 +217,10 @@ Scouter accepts: Pandas DataFrames, NumPy 2D arrays, Polars DataFrames, Pydantic
| `REDIS_ADDR` | — | Redis URL |
| `SCOUTER_STORAGE_URI` | `./scouter_storage` | Object storage (S3, GCS, Azure, local) |
| `SCOUTER_TRACE_REFRESH_INTERVAL_SECS` | `10` | How often each pod refreshes its Delta table snapshot from shared storage. Set lower (e.g. `5`) for faster cross-pod visibility; set higher to reduce object-store LIST calls. Only relevant in multi-pod deployments. |
| `SCOUTER_TRACE_VISIBILITY_BUFFER_SECS` | refresh + 2 | Minimum delay before trace-backed evals are polled. Startup panics if this is lower than `SCOUTER_TRACE_REFRESH_INTERVAL_SECS + 2`, because a smaller buffer can make the poller fetch before a pod sees the committed anchor span. |
| `SCOUTER_INBOX_RECONCILE_AFTER_SECS` | `15` | Age floor before reconciliation scans `awaiting_trace` rows for missing anchor queue events. |
| `SCOUTER_INBOX_RECONCILE_LOOKBACK_SECS` | `86400` | Maximum supported anchor span start lookback used when reconciliation queries Delta for anchor spans. Increase for evals attached to spans that can run longer than 24 hours. |
| `SCOUTER_INBOX_RECONCILE_INTERVAL_SECS` | `60` | How often reconciliation recovers dropped live inbox notifications from Delta. |
| `SCOUTER_INBOX_RECONCILE_BATCH` | `200` | Maximum `awaiting_trace` rows scanned per reconciliation tick. |
| `SCOUTER_ENCRYPT_SECRET` | — | HMAC-SHA256 key (32 bytes) |
| `SCOUTER_BOOTSTRAP_KEY` | — | Initial admin bootstrap key |
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ serde_json = "1.*"
serde_yaml = "0.*"
serde_qs = "0.*"
simsimd = "6.5.*"
sqlx = { version = "0.*", features = [ "runtime-tokio", "tls-native-tls", "postgres", "chrono", "json"] }
sqlx = { version = "0.*", features = [ "runtime-tokio", "tls-native-tls", "postgres", "chrono", "json", "uuid"] }
strum = "0.*"
strum_macros = "0.*"
tabled = { version = "0.*", features = ["ansi"] }
Expand All @@ -122,7 +122,7 @@ utoipa-swagger-ui = { version = "9", features = ["axum"] }
tokio-util = "0.*"
tracing = "0.*"
tracing-subscriber = {version = "0.*", features = ["json", "time", "env-filter"] }
uuid = { version = "1.*", features = ["v7"] }
uuid = { version = "1.*", features = ["v4", "v7"] }
url = "2.*"
statrs = "0.18.0"

Expand Down
1 change: 1 addition & 0 deletions crates/scouter_dataframe/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ datafusion = { workspace = true }
futures = { workspace = true }
object_store = { workspace = true }
mini-moka = { workspace = true }
metrics = { workspace = true }
regex = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
Expand Down
98 changes: 80 additions & 18 deletions crates/scouter_dataframe/src/parquet/tracing/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ use deltalake::datafusion::parquet::schema::types::ColumnPath;
use deltalake::operations::optimize::OptimizeType;
use deltalake::{DeltaTable, DeltaTableBuilder, TableProperty};
use scouter_settings::ObjectStorageSettings;
use scouter_types::SpanId;
use scouter_types::TraceId;
use scouter_types::TraceSpanRecord;
use scouter_types::{Attribute, SpanEvent, SpanLink};
use scouter_types::{
SCOUTER_EVAL_PROFILE_UID, SCOUTER_EVAL_RECORD_UID, SpanId, TraceCommitAnchor, TraceId,
TraceSpanRecord,
};
use serde_json::Value;
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::oneshot;
use tokio::sync::{RwLock as AsyncRwLock, mpsc};
Expand Down Expand Up @@ -221,15 +221,15 @@ pub struct TraceSpanDBEngine {
/// respective `TableProvider`s without a deregister/register gap.
pub catalog: Arc<TraceCatalogProvider>,
control: ControlTableEngine,
commit_tx: Option<mpsc::Sender<Vec<TraceId>>>,
commit_tx: Option<mpsc::Sender<Vec<TraceCommitAnchor>>>,
}

impl TraceSchemaExt for TraceSpanDBEngine {}

impl TraceSpanDBEngine {
pub async fn new(
storage_settings: &ObjectStorageSettings,
commit_tx: Option<mpsc::Sender<Vec<TraceId>>>,
commit_tx: Option<mpsc::Sender<Vec<TraceCommitAnchor>>>,
) -> Result<Self, TraceEngineError> {
let object_store = ObjectStore::new(storage_settings)?;
let schema = Arc::new(Self::create_schema());
Expand Down Expand Up @@ -629,28 +629,90 @@ impl TraceSpanDBEngine {
Some(cmd) = rx.recv() => {
match cmd {
TableCommand::Write { spans, respond_to } => {
let trace_ids: Vec<TraceId> = if self.commit_tx.is_some() {
let mut seen = HashSet::with_capacity(spans.len());
let anchors: Vec<TraceCommitAnchor> = if self.commit_tx.is_some() {
let mut out = Vec::new();
for span in &spans {
seen.insert(span.trace_id);
let mut record_uid: Option<String> = None;
let mut profile_uid: Option<String> = None;
for attr in &span.attributes {
if attr.key == SCOUTER_EVAL_RECORD_UID {
if let Some(value) = attr.value.as_str() {
record_uid = Some(value.to_string());
}
} else if attr.key == SCOUTER_EVAL_PROFILE_UID
&& let Some(value) = attr.value.as_str()
{
profile_uid = Some(value.to_string());
}

if record_uid.is_some() && profile_uid.is_some() {
break;
}
}
if let (Some(record_uid), Some(profile_uid)) =
(record_uid, profile_uid)
{
if let Some(anchor) = TraceCommitAnchor::new(
span.trace_id,
span.span_id.clone(),
record_uid,
profile_uid,
) {
out.push(anchor);
} else {
metrics::counter!(
"scouter_trace_commit_event_invalid_total"
)
.increment(1);
tracing::warn!(
span_id = %span.span_id,
"dropping invalid trace eval anchor attributes"
);
}
}
}
seen.into_iter().collect()
out
} else {
Vec::new()
};

match self.write_spans(spans).await {
Ok(_) => {
if let Some(tx) = &self.commit_tx
&& !trace_ids.is_empty()
&& !anchors.is_empty()
{
let trace_count = trace_ids.len();
if let Err(e) = tx.send(trace_ids).await {
tracing::warn!(
trace_count,
"trace-arrival commit_tx closed after Delta commit ({:?}); timeout sweep will recover affected eval rows",
e
);
let anchor_count = anchors.len();
match tx.try_send(anchors) {
Ok(()) => {
metrics::counter!(
"scouter_trace_commit_event_channel_sent_total"
)
.increment(anchor_count as u64);
}
Err(tokio::sync::mpsc::error::TrySendError::Full(
_,
)) => {
metrics::counter!(
"scouter_trace_commit_event_channel_drop_total"
)
.increment(anchor_count as u64);
tracing::warn!(
anchor_count,
"trace-anchor channel full; dropping live notification, reconciliation sweep will recover"
);
}
Err(tokio::sync::mpsc::error::TrySendError::Closed(
_,
)) => {
metrics::counter!(
"scouter_trace_commit_event_channel_drop_total"
)
.increment(anchor_count as u64);
tracing::error!(
anchor_count,
"trace-anchor channel closed; inbox consumer is dead, reconciliation sweep is now the sole recovery path"
);
}
}
}
let _ = respond_to.send(Ok(()));
Expand Down
150 changes: 149 additions & 1 deletion crates/scouter_dataframe/src/parquet/tracing/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ use datafusion::prelude::*;
use datafusion::scalar::ScalarValue;
use mini_moka::sync::Cache;
use scouter_types::sql::{TraceFilters, TraceMetricBucket, TraceSpan};
use scouter_types::{Attribute, SpanEvent, SpanId, SpanLink, TraceId};
use scouter_types::{
Attribute, AwaitingTraceCommit, SCOUTER_EVAL_PROFILE_UID, SCOUTER_EVAL_RECORD_UID, SpanEvent,
SpanId, SpanLink, TraceCommitAnchor, TraceId,
};
use std::collections::HashMap;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
Expand Down Expand Up @@ -767,6 +770,7 @@ fn flat_to_trace_span(
/// Time predicates are always applied FIRST to enable Delta Lake partition pruning.
/// `span_cache` provides sub-millisecond repeat reads for trace detail clicks.
/// `metrics_cache` provides sub-millisecond repeat reads for dashboard metric charts.
#[derive(Clone)]
pub struct TraceQueries {
ctx: Arc<SessionContext>,
/// LRU cache keyed by 16-byte trace ID. TTL=5 min — archived span data is immutable.
Expand Down Expand Up @@ -941,6 +945,150 @@ impl TraceQueries {
Ok(build_span_tree(flat_spans))
}

/// Find committed anchor spans for awaiting eval rows.
///
/// Reconciliation bounds partition_date by the supported anchor span duration and end_time by
/// the eval arrival window before ID filters. Anchor spans can start long before attach_eval,
/// but they only commit after ending.
pub async fn find_anchor_spans_for_records(
&self,
records: &[AwaitingTraceCommit],
lookback: chrono::Duration,
trace_arrival_timeout: chrono::Duration,
) -> Result<Vec<TraceCommitAnchor>, TraceEngineError> {
if records.is_empty() {
return Ok(Vec::new());
}

let mut by_trace: HashMap<TraceId, Vec<&AwaitingTraceCommit>> = HashMap::new();
for record in records {
by_trace.entry(record.trace_id).or_default().push(record);
}

let mut anchors = Vec::new();
for (trace_id, trace_records) in by_trace {
let Some(window_start) = trace_records
.iter()
.map(|record| record.created_at - lookback)
.min()
else {
continue;
};
let Some(window_end) = trace_records
.iter()
.map(|record| record.created_at + trace_arrival_timeout)
.max()
else {
continue;
};

let expected: HashMap<SpanId, &AwaitingTraceCommit> = trace_records
.iter()
.map(|record| (record.span_id.clone(), *record))
.collect();
let span_literals: Vec<Expr> = expected
.keys()
.map(|span_id| lit(ScalarValue::Binary(Some(span_id.as_bytes().to_vec()))))
.collect();

let mut builder =
TraceQueryBuilder::set_table(self.ctx.clone(), SPAN_TABLE_NAME).await?;
builder = builder.add_filter(col(PARTITION_DATE_COL).gt_eq(date_lit(&window_start)))?;
builder = builder.add_filter(col(PARTITION_DATE_COL).lt_eq(date_lit(&window_end)))?;
builder = builder.add_filter(col(END_TIME_COL).gt_eq(ts_lit(&window_start)))?;
builder = builder.add_filter(col(END_TIME_COL).lt(ts_lit(&window_end)))?;
builder = builder.add_filter(
col(TRACE_ID_COL).eq(lit(ScalarValue::Binary(Some(trace_id.as_bytes().to_vec())))),
)?;
builder = builder.add_filter(col(SPAN_ID_COL).in_list(span_literals, false))?;
builder = builder.select_columns(&[TRACE_ID_COL, SPAN_ID_COL, ATTRIBUTES_COL])?;

for batch in builder.execute().await? {
let schema = batch.schema();
let trace_idx = schema.index_of(TRACE_ID_COL).map_err(|_| {
TraceEngineError::BatchConversion("Missing column: trace_id".into())
})?;
let span_idx = schema.index_of(SPAN_ID_COL).map_err(|_| {
TraceEngineError::BatchConversion("Missing column: span_id".into())
})?;
let attrs_idx = schema.index_of(ATTRIBUTES_COL).map_err(|_| {
TraceEngineError::BatchConversion("Missing column: attributes".into())
})?;

let trace_arr =
cast(batch.column(trace_idx).as_ref(), &DataType::Binary).map_err(|e| {
TraceEngineError::BatchConversion(format!("trace_id cast: {e}"))
})?;
let trace_col = trace_arr
.as_any()
.downcast_ref::<BinaryArray>()
.ok_or_else(|| {
TraceEngineError::BatchConversion("trace_id not BinaryArray".into())
})?;
let span_arr = cast(batch.column(span_idx).as_ref(), &DataType::Binary)
.map_err(|e| TraceEngineError::BatchConversion(format!("span_id cast: {e}")))?;
let span_col =
span_arr
.as_any()
.downcast_ref::<BinaryArray>()
.ok_or_else(|| {
TraceEngineError::BatchConversion("span_id not BinaryArray".into())
})?;
let attrs_col = batch
.column(attrs_idx)
.as_any()
.downcast_ref::<MapArray>()
.ok_or_else(|| {
TraceEngineError::BatchConversion("attributes not MapArray".into())
})?;

for row_idx in 0..batch.num_rows() {
let trace_bytes: [u8; 16] =
trace_col.value(row_idx).try_into().map_err(|_| {
TraceEngineError::BatchConversion(
"trace_id must be exactly 16 bytes".into(),
)
})?;
let span_bytes: [u8; 8] = span_col.value(row_idx).try_into().map_err(|_| {
TraceEngineError::BatchConversion("span_id must be exactly 8 bytes".into())
})?;
let row_trace_id = TraceId::from_bytes(trace_bytes);
let row_span_id = SpanId::from_bytes(span_bytes);
let Some(expected_record) = expected.get(&row_span_id) else {
continue;
};

let attrs = extract_attributes(attrs_col, row_idx);
let record_uid = attrs
.iter()
.find(|attr| attr.key == SCOUTER_EVAL_RECORD_UID)
.and_then(|attr| attr.value.as_str());
if record_uid != Some(expected_record.record_uid.as_str()) {
continue;
}
let Some(profile_uid) = attrs
.iter()
.find(|attr| attr.key == SCOUTER_EVAL_PROFILE_UID)
.and_then(|attr| attr.value.as_str())
else {
continue;
};

if let Some(anchor) = TraceCommitAnchor::new(
row_trace_id,
row_span_id,
expected_record.record_uid.clone(),
profile_uid.to_string(),
) {
anchors.push(anchor);
}
}
}
}

Ok(anchors)
}

/// Get trace metrics over a time range, bucketed by the given interval string.
///
/// `bucket_interval` must be a valid DataFusion `DATE_TRUNC` precision unit:
Expand Down
Loading
Loading