diff --git a/api/oss/src/core/evaluations/service.py b/api/oss/src/core/evaluations/service.py index 1c3936973a..127761e81a 100644 --- a/api/oss/src/core/evaluations/service.py +++ b/api/oss/src/core/evaluations/service.py @@ -22,6 +22,7 @@ EvaluationRunDataStep, EvaluationRunDataConcurrency, EvaluationRunData, + JsonSchemas, EvaluationRun, EvaluationRunCreate, EvaluationRunEdit, @@ -1428,6 +1429,9 @@ async def _refresh_metrics( # Resolved metric keys per step (declared schema, else trace-inferred); # become the run's `mappings`. Rewrite only when something was inferred. metrics_keys_by_step: Dict[str, List[Dict[str, str]]] = {} + # Trace-inferred outputs schema per step, persisted onto the run step so + # the UI can type filter columns for schema-less evaluators. + inferred_schemas_by_step: Dict[str, Dict[str, Any]] = {} any_inferred = False for step in refreshable_steps: @@ -1487,6 +1491,7 @@ async def _refresh_metrics( if metrics_keys: any_inferred = True + inferred_schemas_by_step[step.key] = inferred_schema # Record declared + inferred keys; skip [] (would wipe the # step's existing mapping without replacing it). @@ -1504,13 +1509,22 @@ async def _refresh_metrics( # Rewrite mappings only if a schema was inferred this pass; declared-only # runs already have correct mappings. Pass the full set (declared + inferred). + # A closed run can't be edited; skip the persist rather than abort the + # whole refresh (metric recompute/store below still needs to run). if any_inferred and metrics_keys_by_step and run and run.data: - await self._update_run_mappings_from_inferred_metrics( - project_id=project_id, - user_id=user_id, - run=run, - inferred_metrics_keys_by_step=metrics_keys_by_step, - ) + try: + await self._update_run_mappings_from_inferred_metrics( + project_id=project_id, + user_id=user_id, + run=run, + inferred_metrics_keys_by_step=metrics_keys_by_step, + inferred_schemas_by_step=inferred_schemas_by_step, + ) + except EvaluationClosedConflict: + log.info( + "[METRICS] Skipping inferred-schema persistence for closed run", + run_id=run.id, + ) steps_specs: Dict[str, List[MetricSpec]] = dict() @@ -1699,6 +1713,7 @@ async def _update_run_mappings_from_inferred_metrics( user_id: UUID, run: EvaluationRun, inferred_metrics_keys_by_step: Dict[str, List[Dict[str, str]]], + inferred_schemas_by_step: Optional[Dict[str, Dict[str, Any]]] = None, ) -> None: existing_mappings = list(run.data.mappings or []) updated_mappings: List[EvaluationRunDataMapping] = [] @@ -1764,23 +1779,36 @@ def mapping_key( ) ) - if updated_mappings != existing_mappings: - run_data = EvaluationRunData( - steps=run.data.steps, - repeats=run.data.repeats, - mappings=updated_mappings, + existing_steps = list(run.data.steps or []) + updated_steps = existing_steps + if inferred_schemas_by_step: + updated_steps = [] + for step in existing_steps: + inferred_outputs = inferred_schemas_by_step.get(step.key) + if inferred_outputs and (not step.schemas or not step.schemas.outputs): + updated_steps.append( + step.model_copy( + update={"schemas": JsonSchemas(outputs=inferred_outputs)} + ) + ) + else: + updated_steps.append(step) + + if updated_mappings != existing_mappings or updated_steps != existing_steps: + run_data = run.data.model_copy( + update={"steps": updated_steps, "mappings": updated_mappings} + ) + run_edit = EvaluationRunEdit( + **run.model_dump( + include=set(EvaluationRunEdit.model_fields) - {"data"}, + exclude_none=True, + ), + data=run_data, ) await self.edit_run( project_id=project_id, user_id=user_id, - run=EvaluationRunEdit( - id=run.id, - name=run.name, - description=run.description, - status=run.status, - flags=run.flags, - data=run_data, - ), + run=run_edit, ) # - EVALUATION QUEUE ------------------------------------------------------- diff --git a/api/oss/src/core/evaluations/types.py b/api/oss/src/core/evaluations/types.py index fbf2b5ae6e..bc7ede8afd 100644 --- a/api/oss/src/core/evaluations/types.py +++ b/api/oss/src/core/evaluations/types.py @@ -35,6 +35,7 @@ # engine is the package that ships it. Importers keep using # `core.evaluations.types.EvaluationStatus` unchanged. from agenta.sdk.models.evaluations import EvaluationStatus # noqa: E402 +from agenta.sdk.models.workflows import JsonSchemas # noqa: E402 class EvaluationClosedConflict(Exception): @@ -257,6 +258,10 @@ class EvaluationRunDataStep(BaseModel): origin: Origin references: Dict[str, Reference] inputs: Optional[List[EvaluationRunDataStepInput]] = None + # Outputs schema inferred from traces when the evaluator declares none. + # Run-scoped (reflects this run's observed outputs), so the immutable + # evaluator revision is never rewritten. Only `outputs` is populated. + schemas: Optional[JsonSchemas] = None class EvaluationRunDataMappingColumn(BaseModel): diff --git a/api/oss/tests/pytest/unit/evaluations/test_inferred_schema_persistence.py b/api/oss/tests/pytest/unit/evaluations/test_inferred_schema_persistence.py new file mode 100644 index 0000000000..c5b948219a --- /dev/null +++ b/api/oss/tests/pytest/unit/evaluations/test_inferred_schema_persistence.py @@ -0,0 +1,150 @@ +""" +`EvaluationsService._update_run_mappings_from_inferred_metrics` persists the +trace-inferred outputs schema onto run steps (so the UI can type filter columns +for evaluators that declare no schema) without clobbering unrelated run data. + +These are unit tests: the method only calls `self.edit_run`, so we build a bare +service, stub `edit_run`, and assert on the `EvaluationRunEdit` it receives. +""" + +from types import SimpleNamespace +from unittest.mock import AsyncMock +from uuid import uuid4 + +import pytest + +from oss.src.core.evaluations.service import EvaluationsService +from oss.src.core.evaluations.types import ( + EvaluationRun, + EvaluationRunData, + EvaluationRunDataStep, + EvaluationRunDataConcurrency, + JsonSchemas, +) +from agenta.sdk.models.workflows import JsonSchemas as _SdkJsonSchemas # noqa: F401 + + +def _service_with_stubbed_edit(): + service = object.__new__(EvaluationsService) + service.edit_run = AsyncMock() # type: ignore[attr-defined] + return service + + +def _annotation_step(key: str, schemas=None) -> EvaluationRunDataStep: + return EvaluationRunDataStep( + key=key, + type="annotation", + origin="custom", + references={}, + schemas=schemas, + ) + + +def _run(steps, *, tags=None, meta=None, concurrency=None) -> EvaluationRun: + return EvaluationRun( + id=uuid4(), + name="run", + description="desc", + tags=tags, + meta=meta, + data=EvaluationRunData( + steps=steps, + mappings=[], + concurrency=concurrency, + ), + ) + + +def _edited_run(service) -> SimpleNamespace: + assert service.edit_run.await_count == 1 + return service.edit_run.await_args.kwargs["run"] + + +@pytest.mark.asyncio +async def test_inferred_schema_persisted_onto_schemaless_step(): + service = _service_with_stubbed_edit() + run = _run([_annotation_step("evaluator-x")]) + inferred = { + "type": "object", + "properties": { + "myscore": {"type": "integer"}, + "success": {"type": "boolean"}, + }, + } + + await service._update_run_mappings_from_inferred_metrics( + project_id=uuid4(), + user_id=uuid4(), + run=run, + inferred_metrics_keys_by_step={ + "evaluator-x": [ + {"path": "myscore", "type": "numeric/discrete"}, + {"path": "success", "type": "binary"}, + ] + }, + inferred_schemas_by_step={"evaluator-x": inferred}, + ) + + edited = _edited_run(service) + step = edited.data.steps[0] + assert step.schemas is not None + assert step.schemas.outputs == inferred + + +@pytest.mark.asyncio +async def test_existing_step_schema_is_not_overwritten(): + service = _service_with_stubbed_edit() + declared = JsonSchemas( + outputs={"type": "object", "properties": {"a": {"type": "string"}}} + ) + run = _run([_annotation_step("evaluator-x", schemas=declared)]) + + await service._update_run_mappings_from_inferred_metrics( + project_id=uuid4(), + user_id=uuid4(), + run=run, + inferred_metrics_keys_by_step={ + "evaluator-x": [{"path": "b", "type": "numeric/discrete"}] + }, + inferred_schemas_by_step={ + "evaluator-x": {"type": "object", "properties": {"b": {"type": "integer"}}} + }, + ) + + # Mappings still changed (new inferred column), so edit_run is called, but + # the step keeps its declared schema rather than the inferred one. + edited = _edited_run(service) + step = next(s for s in edited.data.steps if s.key == "evaluator-x") + assert step.schemas.outputs == declared.outputs + + +@pytest.mark.asyncio +async def test_full_put_preserves_unrelated_run_data(): + service = _service_with_stubbed_edit() + concurrency = EvaluationRunDataConcurrency(batch_size=7, max_retries=3) + run = _run( + [_annotation_step("evaluator-x")], + tags={"team": "evals"}, + meta={"note": "keep me"}, + concurrency=concurrency, + ) + + await service._update_run_mappings_from_inferred_metrics( + project_id=uuid4(), + user_id=uuid4(), + run=run, + inferred_metrics_keys_by_step={ + "evaluator-x": [{"path": "myscore", "type": "numeric/discrete"}] + }, + inferred_schemas_by_step={ + "evaluator-x": { + "type": "object", + "properties": {"myscore": {"type": "integer"}}, + } + }, + ) + + edited = _edited_run(service) + assert edited.tags == {"team": "evals"} + assert edited.meta == {"note": "keep me"} + assert edited.data.concurrency == concurrency diff --git a/web/oss/src/components/EvalRunDetails/atoms/table/columns.ts b/web/oss/src/components/EvalRunDetails/atoms/table/columns.ts index fd04f1fc8e..189984e503 100644 --- a/web/oss/src/components/EvalRunDetails/atoms/table/columns.ts +++ b/web/oss/src/components/EvalRunDetails/atoms/table/columns.ts @@ -1,3 +1,4 @@ +import {extractMetrics} from "@agenta/entities/workflow" import {atom} from "jotai" import {atomFamily} from "jotai/utils" @@ -329,6 +330,24 @@ const tableColumnsBaseAtomFamily = atomFamily((runId: string | null) => ? runData.camelRun.data.mappings : [] + // Per-step metric definitions derived from the outputs schema the + // backend inferred from traces and stored on the run step. This is the + // type source for schema-less evaluators, whose immutable revision + // carries no schema. Keyed by step key. + const stepSchemaMetricsByStepKey = new Map>() + const runSteps = Array.isArray(runData.camelRun?.data?.steps) + ? runData.camelRun.data.steps + : [] + for (const step of runSteps) { + const outputs = (step as {schemas?: {outputs?: unknown}})?.schemas?.outputs + const stepKey = (step as {key?: string})?.key + if (!outputs || !stepKey) continue + stepSchemaMetricsByStepKey.set( + stepKey, + extractMetrics({id: stepKey, slug: stepKey, data: {schemas: {outputs}}}), + ) + } + const counters: Record<"input" | "invocation" | "annotation", number> = { input: 0, invocation: 0, @@ -506,13 +525,22 @@ const tableColumnsBaseAtomFamily = atomFamily((runId: string | null) => // canonical-key-only match misses and `metricType` falls back // to "string", mis-typing the column (e.g. a boolean output). const metricKey = column.metricKey || column.valueKey - const metricDefinition = evaluator?.metrics.find( - (metric) => - metric.name === metricKey || - metric.path === metricKey || - metric.name === column.valueKey || - metric.path === column.valueKey, - ) + const matchMetric = (metrics: ReturnType | undefined) => + metrics?.find( + (metric) => + metric.name === metricKey || + metric.path === metricKey || + metric.name === column.valueKey || + metric.path === column.valueKey, + ) + // Schema-declared evaluator metrics first; else fall back to the + // outputs schema the backend inferred from traces and stored on the + // run step — the only type source for schema-less evaluators, whose + // immutable revision carries no schema. "string" is the cold-start + // fallback before any type is known. + const metricDefinition = + matchMetric(evaluator?.metrics) ?? + matchMetric(stepSchemaMetricsByStepKey.get(column.stepKey ?? "")) const metricType = metricDefinition?.metricType || column.metricType || METRIC_TYPE_FALLBACK const evaluatorLabel = evaluator?.name || column.evaluatorSlug || "Annotations"