Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ EVA_MODEL__LLM=gpt-5.2
#i Conversation timeout in seconds.
#d int
#r 30,10000,10
#v EVA_CONVERSATION_TIMEOUT_SECONDS=360
#v EVA_CONVERSATION_TIMEOUT_SECONDS=600

#i Maximum rerun attempts for failed records.
#d int
Expand Down
2 changes: 2 additions & 0 deletions src/eva/metrics/diagnostic/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from . import authentication_success # noqa
from . import conversation_correctly_finished # noqa
from . import conversation_timeout # noqa
from . import response_speed # noqa
from . import speakability # noqa
from . import stt_wer # noqa
Expand All @@ -11,6 +12,7 @@
__all__ = [
"authentication_success",
"conversation_correctly_finished",
"conversation_timeout",
"response_speed",
"speakability",
"stt_wer",
Expand Down
35 changes: 35 additions & 0 deletions src/eva/metrics/diagnostic/conversation_timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""Conversation-timeout diagnostic metric."""

from eva.metrics.base import CodeMetric, MetricContext
from eva.metrics.registry import register_metric
from eva.models.results import MetricScore


@register_metric
class ConversationTimeoutMetric(CodeMetric):
"""1.0 when the conversation finished within the time limit; 0.0 when it timed out."""

name = "conversation_finished_on_time"
version = "v0.1"
description = "Diagnostic metric: 1.0 when conversation finished within time limit, 0.0 on timeout"
category = "diagnostic"
exclude_from_pass_at_k = True

async def compute(self, context: MetricContext) -> MetricScore:
try:
reason = context.conversation_ended_reason
timed_out = reason == "timeout"
score = 0.0 if timed_out else 1.0

return MetricScore(
name=self.name,
score=score,
normalized_score=score,
details={
"conversation_ended_reason": reason,
"timed_out": timed_out,
},
)

except Exception as e:
return self._handle_error(e, context)
2 changes: 1 addition & 1 deletion src/eva/metrics/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,7 @@ def _compute_latency_summary(self) -> dict[str, Any]:
Returns a dict with the mean of mean_ms values for each latency type
that has at least one non-null entry.
"""
latency_keys = ["llm_latency", "stt_latency", "tts_latency"]
latency_keys = ["llm_latency", "stt_latency", "tts_latency", "model_response_latency"]
collected: dict[str, list[float]] = {k: [] for k in latency_keys}

for _record_id, record_dir in self._discover_record_dirs(self.run_dir, self.record_ids):
Expand Down
19 changes: 14 additions & 5 deletions src/eva/metrics/validation/user_behavioral_fidelity.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,20 @@ def get_prompt_variables(self, context: MetricContext, transcript_text: str) ->
context.audio_timestamps_user_turns,
context.audio_timestamps_assistant_turns,
)
conversation_end = (
"the agent's failure to respond to the final user turn."
if agent_timeout
else "the user calling the end_call tool."
)
if context.conversation_ended_reason == "timeout":
conversation_end = (
"a system timeout — the conversation exceeded the allowed time limit. "
"The user did NOT end the call; the system terminated the conversation."
)
elif agent_timeout:
conversation_end = "the agent's failure to respond to the final user turn."
elif context.conversation_ended_reason == "inactivity_timeout":
conversation_end = (
"an inactivity timeout — neither the user nor the agent spoke for an extended period. "
"The user did NOT end the call; the system terminated the conversation due to silence."
)
else:
conversation_end = "the user calling the end_call tool."

return {
"conversation_evidence": conversation_evidence,
Expand Down
8 changes: 5 additions & 3 deletions src/eva/models/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,12 +429,14 @@ class ModelDeployment(DeploymentTypedDict):
init=False,
)

validation_thresholds: dict[str, float] = Field(
validation_thresholds: dict[str, float | int] = Field(
{
"conversation_valid_end": 1.0,
"user_behavioral_fidelity": 1.0,
"max_timeout_attempts": 1,
},
description="Validation metric thresholds for rerun decisions (JSON)",
description="Validation metric thresholds and settings for rerun decisions (JSON). "
"max_timeout_attempts sets the max number of attempts that timeout before accepting a run for evaluation. Default is 1.",
)

# Multi-attempt (for pass@k evaluation)
Expand Down Expand Up @@ -485,7 +487,7 @@ class ModelDeployment(DeploymentTypedDict):
description="Maximum number of concurrent conversations",
)
conversation_timeout_seconds: int = Field(
360,
600,
ge=30,
le=10000,
description="Timeout for each conversation in seconds",
Expand Down
4 changes: 4 additions & 0 deletions src/eva/models/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ class ConversationResult(BaseModel):
)
initial_scenario_db_hash: str | None = Field(None, description="SHA-256 hash of initial scenario database")
final_scenario_db_hash: str | None = Field(None, description="SHA-256 hash of final scenario database")
timeout_accepted: bool = Field(
False,
description="Whether this record was accepted after exhausting timeout attempts (gate bypass)",
)


class MetricScore(BaseModel):
Expand Down
117 changes: 116 additions & 1 deletion src/eva/orchestrator/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ async def run(self, records: list[EvaluationRecord]) -> RunResult:
all_output_ids = list(output_id_to_record.keys())
pending_output_ids = list(all_output_ids)
rerun_history: dict[str, list[dict]] = {}
timeout_attempt_counts: dict[str, int] = {}
timeout_validation_cache: dict[str, dict[int, ValidationResult]] = {}
max_timeout_attempts = int(self.config.validation_thresholds.get("max_timeout_attempts", 3))
timeout_accepted_ids: set[str] = set()
started_at = datetime.now()

# Initialize port pool once before the attempt loop.
Expand Down Expand Up @@ -272,8 +276,11 @@ async def _run_and_pipeline(
not_finished_ids: list[str] = []
failed_validation_ids: list[str] = []
validation_results: dict[str, ValidationResult] = {}
result_map: dict[str, ConversationResult] = {}

for output_id, _result, passed, vr in pipeline_results:
if isinstance(_result, ConversationResult):
result_map[output_id] = _result
if vr is None or (not vr.passed and not vr.failed_metrics):
not_finished_ids.append(output_id)
else:
Expand All @@ -290,10 +297,20 @@ async def _run_and_pipeline(
failed_this_attempt = not_finished_ids + failed_validation_ids

for oid in not_finished_ids:
# Distinguish timeout from other not_finished reasons
cr = result_map.get(oid)
is_timeout = cr is not None and cr.conversation_ended_reason == "timeout"
reason = "timeout" if is_timeout else "not_finished"
if is_timeout:
timeout_attempt_counts[oid] = timeout_attempt_counts.get(oid, 0) + 1
# Eagerly run LLM validation (skip gate) on every timeout attempt
# and cache the result for later lookup.
vr = await pipeline_validation_runner.validate_one(oid, skip_gate=True)
timeout_validation_cache.setdefault(oid, {})[attempt_number] = vr
rerun_history.setdefault(oid, []).append(
{
"attempt": attempt_number,
"reason": "not_finished",
"reason": reason,
}
)
for oid in failed_validation_ids:
Expand All @@ -310,6 +327,76 @@ async def _run_and_pipeline(
entry["failure_details"] = failure_details
rerun_history.setdefault(oid, []).append(entry)

# Check for timeout-accepted records: records that have timed out
# max_timeout_attempts times get evaluated with gate bypass.
# The current attempt was already validated eagerly above; if it passes,
# accept immediately. Otherwise, scan cached results from previous
# timeout attempts and restore the archived directory if one passes.
newly_timeout_accepted: list[str] = []
for oid in list(failed_this_attempt):
if timeout_attempt_counts.get(oid, 0) < max_timeout_attempts:
continue

cached = timeout_validation_cache.get(oid, {})
# Check current attempt first
current_vr = cached.get(attempt_number)
if current_vr and current_vr.passed:
logger.info(
f"Record {oid} timed out {timeout_attempt_counts[oid]} times, "
f"current attempt passed LLM validation — accepting"
)
self._accept_timeout_record(
oid,
failed_this_attempt,
finished_ids,
newly_timeout_accepted,
metrics_runner,
metrics_background_tasks,
)
continue

# Scan previous timeout attempts for one that passed
accepted_from_archive = False
for prev_attempt, prev_vr in cached.items():
if prev_attempt == attempt_number:
continue
if prev_vr.passed:
logger.info(
f"Record {oid} timed out {timeout_attempt_counts[oid]} times, "
f"restoring attempt {prev_attempt} which passed LLM validation"
)
# Restore the archived attempt directory
archive_dir = self.output_dir / "records" / f"{oid}_failed_attempt_{prev_attempt}"
record_dir = self.output_dir / "records" / oid
if archive_dir.exists():
# Move current attempt out of the way, restore the passing one
if record_dir.exists():
shutil.move(
str(record_dir),
str(self.output_dir / "records" / f"{oid}_failed_attempt_{attempt_number}"),
)
shutil.move(str(archive_dir), str(record_dir))
self._accept_timeout_record(
oid,
failed_this_attempt,
finished_ids,
newly_timeout_accepted,
metrics_runner,
metrics_background_tasks,
)
accepted_from_archive = True
break

if not accepted_from_archive:
logger.info(
f"Record {oid} timed out {timeout_attempt_counts[oid]} times, "
f"no attempt passed LLM validation — staying pending"
)

if newly_timeout_accepted:
timeout_accepted_ids.update(newly_timeout_accepted)
logger.info(f"{len(newly_timeout_accepted)} timeout records accepted via gate bypass")

pending_output_ids = failed_this_attempt

if not pending_output_ids:
Expand Down Expand Up @@ -415,6 +502,7 @@ async def _run_and_pipeline(
"total_attempts": attempt_number,
"failed_record_ids": sorted(final_failed_ids),
"successful_record_ids": sorted(successful_ids),
"timeout_accepted_record_ids": sorted(timeout_accepted_ids),
},
"rerun_history": rerun_history,
"final_failures": final_failures,
Expand Down Expand Up @@ -1028,6 +1116,33 @@ def settings_customise_sources(cls, settings_cls, init_settings, **kwargs):
runner.output_dir = run_dir # Use existing output dir, don't create new
return runner

def _accept_timeout_record(
self,
oid: str,
failed_this_attempt: list[str],
finished_ids: list[str],
newly_timeout_accepted: list[str],
metrics_runner: MetricsRunner | None,
metrics_background_tasks: list[asyncio.Task],
) -> None:
"""Accept a timeout record by updating result.json and scheduling metrics."""
failed_this_attempt.remove(oid)
finished_ids.append(oid)
newly_timeout_accepted.append(oid)
# Update result.json with timeout_accepted flag
result_path = self.output_dir / "records" / oid / "result.json"
if result_path.exists():
with open(result_path) as f:
result_data = json.load(f)
result_data["timeout_accepted"] = True
with open(result_path, "w") as f:
json.dump(result_data, f, indent=2)
# Fire metrics if runner available
if metrics_runner is not None:
rdir = self.output_dir / "records" / oid
task = asyncio.create_task(metrics_runner.run_and_save_record(oid, rdir))
metrics_background_tasks.append(task)

def _archive_failed_attempt(self, record_id: str, attempt_number: int) -> None:
"""Archive a failed attempt before rerunning.

Expand Down
38 changes: 22 additions & 16 deletions src/eva/orchestrator/validation_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def __init__(
self,
run_dir: Path,
dataset: list[EvaluationRecord],
thresholds: dict[str, float],
thresholds: dict[str, float | int],
metric_configs: dict[str, dict] | None = None,
output_ids: list[str] | None = None,
):
Expand Down Expand Up @@ -103,7 +103,7 @@ async def run_validation(self) -> dict[str, ValidationResult]:

return validation_results

async def validate_one(self, output_id: str) -> ValidationResult:
async def validate_one(self, output_id: str, *, skip_gate: bool = False) -> ValidationResult:
"""Validate a single record inline for per-record pipelining.

Runs a two-phase check matching run_validation():
Expand All @@ -118,6 +118,10 @@ async def validate_one(self, output_id: str) -> ValidationResult:

Args:
output_id: Record directory name (e.g. "1.2.1" or "1.2.1/trial_0").
skip_gate: If True, bypass the conversation_valid_end gate and run only
LLM metrics. Used for timeout-accepted records where the conversation
timed out (no goodbye event) but we still want to evaluate against
thresholds.

Returns:
ValidationResult with pass/fail details.
Expand All @@ -141,23 +145,25 @@ async def validate_one(self, output_id: str) -> ValidationResult:

record_dir = self.run_dir / "records" / output_id

# Phase 1: gate metric
gate_metrics = await self._shared_gate_runner.run_and_save_record(output_id, record_dir)
rm = gate_metrics
ms = rm.metrics.get(GATE_METRIC) if rm else None
if ms is None or ms.error:
return ValidationResult(passed=False) # empty failed_metrics = "not_finished"
score = ms.normalized_score if ms.normalized_score is not None else ms.score
if score != 1.0:
return ValidationResult(passed=False) # empty failed_metrics = "not_finished"

# Phase 2: LLM metrics (gate passed)
if not skip_gate:
# Phase 1: gate metric
gate_metrics = await self._shared_gate_runner.run_and_save_record(output_id, record_dir)
rm = gate_metrics
ms = rm.metrics.get(GATE_METRIC) if rm else None
if ms is None or ms.error:
return ValidationResult(passed=False) # empty failed_metrics = "not_finished"
score = ms.normalized_score if ms.normalized_score is not None else ms.score
if score != 1.0:
return ValidationResult(passed=False) # empty failed_metrics = "not_finished"

# Phase 2: LLM metrics (gate passed or skipped)
llm_metrics = await self._shared_llm_runner.run_and_save_record(output_id, record_dir)
if llm_metrics is None:
return ValidationResult(passed=False, failed_metrics=list(LLM_METRICS))

vr = self._evaluate_record(output_id, llm_metrics, LLM_METRICS)
vr.scores[GATE_METRIC] = 1.0
if not skip_gate:
vr.scores[GATE_METRIC] = 1.0
return vr

@staticmethod
Expand Down Expand Up @@ -226,8 +232,8 @@ def _evaluate_record(
details[metric_name] = metric_score.details
continue

threshold = self.thresholds.get(metric_name, 1.0)
if score < threshold:
threshold = float(self.thresholds.get(metric_name, 1.0))
if score is None or score < threshold:
logger.debug(
f"Record {record_id}: Metric '{metric_name}' score {score:.2f} < threshold {threshold:.2f}"
)
Expand Down
8 changes: 7 additions & 1 deletion tests/fixtures/metric_signatures.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@
"source_hash": "91b71c803d77",
"version": "v0.1"
},
"ConversationTimeoutMetric": {
"name": "conversation_finished_on_time",
"prompt_hash": null,
"source_hash": "5fcfcc42ae78",
"version": "v0.1"
},
"ConversationValidEndMetric": {
"name": "conversation_valid_end",
"prompt_hash": null,
Expand Down Expand Up @@ -92,7 +98,7 @@
"UserBehavioralFidelityMetric": {
"name": "user_behavioral_fidelity",
"prompt_hash": "06477144c28e",
"source_hash": "af8144bd7731",
"source_hash": "214ede84da72",
"version": "v0.1"
},
"UserSpeechFidelityMetric": {
Expand Down
Loading
Loading