diff --git a/.env.example b/.env.example index dd78e6ef..62b9f072 100644 --- a/.env.example +++ b/.env.example @@ -172,7 +172,7 @@ EVA_MODEL__LLM=gpt-5.2 #i Conversation timeout in seconds. #d int #r 30,10000,10 -#v EVA_CONVERSATION_TIMEOUT_SECONDS=360 +#v EVA_CONVERSATION_TIMEOUT_SECONDS=600 #i Maximum rerun attempts for failed records. #d int diff --git a/src/eva/metrics/diagnostic/__init__.py b/src/eva/metrics/diagnostic/__init__.py index 687c1ef9..eb3d5e17 100644 --- a/src/eva/metrics/diagnostic/__init__.py +++ b/src/eva/metrics/diagnostic/__init__.py @@ -2,6 +2,7 @@ from . import authentication_success # noqa from . import conversation_correctly_finished # noqa +from . import conversation_timeout # noqa from . import response_speed # noqa from . import speakability # noqa from . import stt_wer # noqa @@ -11,6 +12,7 @@ __all__ = [ "authentication_success", "conversation_correctly_finished", + "conversation_timeout", "response_speed", "speakability", "stt_wer", diff --git a/src/eva/metrics/diagnostic/conversation_timeout.py b/src/eva/metrics/diagnostic/conversation_timeout.py new file mode 100644 index 00000000..75118316 --- /dev/null +++ b/src/eva/metrics/diagnostic/conversation_timeout.py @@ -0,0 +1,35 @@ +"""Conversation-timeout diagnostic metric.""" + +from eva.metrics.base import CodeMetric, MetricContext +from eva.metrics.registry import register_metric +from eva.models.results import MetricScore + + +@register_metric +class ConversationTimeoutMetric(CodeMetric): + """1.0 when the conversation finished within the time limit; 0.0 when it timed out.""" + + name = "conversation_finished_on_time" + version = "v0.1" + description = "Diagnostic metric: 1.0 when conversation finished within time limit, 0.0 on timeout" + category = "diagnostic" + exclude_from_pass_at_k = True + + async def compute(self, context: MetricContext) -> MetricScore: + try: + reason = context.conversation_ended_reason + timed_out = reason == "timeout" + score = 0.0 if timed_out else 1.0 + + return MetricScore( + name=self.name, + score=score, + normalized_score=score, + details={ + "conversation_ended_reason": reason, + "timed_out": timed_out, + }, + ) + + except Exception as e: + return self._handle_error(e, context) diff --git a/src/eva/metrics/runner.py b/src/eva/metrics/runner.py index 64c7aec2..22b87d15 100644 --- a/src/eva/metrics/runner.py +++ b/src/eva/metrics/runner.py @@ -912,7 +912,7 @@ def _compute_latency_summary(self) -> dict[str, Any]: Returns a dict with the mean of mean_ms values for each latency type that has at least one non-null entry. """ - latency_keys = ["llm_latency", "stt_latency", "tts_latency"] + latency_keys = ["llm_latency", "stt_latency", "tts_latency", "model_response_latency"] collected: dict[str, list[float]] = {k: [] for k in latency_keys} for _record_id, record_dir in self._discover_record_dirs(self.run_dir, self.record_ids): diff --git a/src/eva/metrics/validation/user_behavioral_fidelity.py b/src/eva/metrics/validation/user_behavioral_fidelity.py index 0af13816..32b96676 100644 --- a/src/eva/metrics/validation/user_behavioral_fidelity.py +++ b/src/eva/metrics/validation/user_behavioral_fidelity.py @@ -87,11 +87,20 @@ def get_prompt_variables(self, context: MetricContext, transcript_text: str) -> context.audio_timestamps_user_turns, context.audio_timestamps_assistant_turns, ) - conversation_end = ( - "the agent's failure to respond to the final user turn." - if agent_timeout - else "the user calling the end_call tool." - ) + if context.conversation_ended_reason == "timeout": + conversation_end = ( + "a system timeout — the conversation exceeded the allowed time limit. " + "The user did NOT end the call; the system terminated the conversation." + ) + elif agent_timeout: + conversation_end = "the agent's failure to respond to the final user turn." + elif context.conversation_ended_reason == "inactivity_timeout": + conversation_end = ( + "an inactivity timeout — neither the user nor the agent spoke for an extended period. " + "The user did NOT end the call; the system terminated the conversation due to silence." + ) + else: + conversation_end = "the user calling the end_call tool." return { "conversation_evidence": conversation_evidence, diff --git a/src/eva/models/config.py b/src/eva/models/config.py index 6752d54b..2b6c5b2b 100644 --- a/src/eva/models/config.py +++ b/src/eva/models/config.py @@ -429,12 +429,14 @@ class ModelDeployment(DeploymentTypedDict): init=False, ) - validation_thresholds: dict[str, float] = Field( + validation_thresholds: dict[str, float | int] = Field( { "conversation_valid_end": 1.0, "user_behavioral_fidelity": 1.0, + "max_timeout_attempts": 1, }, - description="Validation metric thresholds for rerun decisions (JSON)", + description="Validation metric thresholds and settings for rerun decisions (JSON). " + "max_timeout_attempts sets the max number of attempts that timeout before accepting a run for evaluation. Default is 1.", ) # Multi-attempt (for pass@k evaluation) @@ -485,7 +487,7 @@ class ModelDeployment(DeploymentTypedDict): description="Maximum number of concurrent conversations", ) conversation_timeout_seconds: int = Field( - 360, + 600, ge=30, le=10000, description="Timeout for each conversation in seconds", diff --git a/src/eva/models/results.py b/src/eva/models/results.py index 0caa7ab6..c242398e 100644 --- a/src/eva/models/results.py +++ b/src/eva/models/results.py @@ -80,6 +80,10 @@ class ConversationResult(BaseModel): ) initial_scenario_db_hash: str | None = Field(None, description="SHA-256 hash of initial scenario database") final_scenario_db_hash: str | None = Field(None, description="SHA-256 hash of final scenario database") + timeout_accepted: bool = Field( + False, + description="Whether this record was accepted after exhausting timeout attempts (gate bypass)", + ) class MetricScore(BaseModel): diff --git a/src/eva/orchestrator/runner.py b/src/eva/orchestrator/runner.py index 17a285bb..9051d221 100644 --- a/src/eva/orchestrator/runner.py +++ b/src/eva/orchestrator/runner.py @@ -159,6 +159,10 @@ async def run(self, records: list[EvaluationRecord]) -> RunResult: all_output_ids = list(output_id_to_record.keys()) pending_output_ids = list(all_output_ids) rerun_history: dict[str, list[dict]] = {} + timeout_attempt_counts: dict[str, int] = {} + timeout_validation_cache: dict[str, dict[int, ValidationResult]] = {} + max_timeout_attempts = int(self.config.validation_thresholds.get("max_timeout_attempts", 3)) + timeout_accepted_ids: set[str] = set() started_at = datetime.now() # Initialize port pool once before the attempt loop. @@ -272,8 +276,11 @@ async def _run_and_pipeline( not_finished_ids: list[str] = [] failed_validation_ids: list[str] = [] validation_results: dict[str, ValidationResult] = {} + result_map: dict[str, ConversationResult] = {} for output_id, _result, passed, vr in pipeline_results: + if isinstance(_result, ConversationResult): + result_map[output_id] = _result if vr is None or (not vr.passed and not vr.failed_metrics): not_finished_ids.append(output_id) else: @@ -290,10 +297,20 @@ async def _run_and_pipeline( failed_this_attempt = not_finished_ids + failed_validation_ids for oid in not_finished_ids: + # Distinguish timeout from other not_finished reasons + cr = result_map.get(oid) + is_timeout = cr is not None and cr.conversation_ended_reason == "timeout" + reason = "timeout" if is_timeout else "not_finished" + if is_timeout: + timeout_attempt_counts[oid] = timeout_attempt_counts.get(oid, 0) + 1 + # Eagerly run LLM validation (skip gate) on every timeout attempt + # and cache the result for later lookup. + vr = await pipeline_validation_runner.validate_one(oid, skip_gate=True) + timeout_validation_cache.setdefault(oid, {})[attempt_number] = vr rerun_history.setdefault(oid, []).append( { "attempt": attempt_number, - "reason": "not_finished", + "reason": reason, } ) for oid in failed_validation_ids: @@ -310,6 +327,76 @@ async def _run_and_pipeline( entry["failure_details"] = failure_details rerun_history.setdefault(oid, []).append(entry) + # Check for timeout-accepted records: records that have timed out + # max_timeout_attempts times get evaluated with gate bypass. + # The current attempt was already validated eagerly above; if it passes, + # accept immediately. Otherwise, scan cached results from previous + # timeout attempts and restore the archived directory if one passes. + newly_timeout_accepted: list[str] = [] + for oid in list(failed_this_attempt): + if timeout_attempt_counts.get(oid, 0) < max_timeout_attempts: + continue + + cached = timeout_validation_cache.get(oid, {}) + # Check current attempt first + current_vr = cached.get(attempt_number) + if current_vr and current_vr.passed: + logger.info( + f"Record {oid} timed out {timeout_attempt_counts[oid]} times, " + f"current attempt passed LLM validation — accepting" + ) + self._accept_timeout_record( + oid, + failed_this_attempt, + finished_ids, + newly_timeout_accepted, + metrics_runner, + metrics_background_tasks, + ) + continue + + # Scan previous timeout attempts for one that passed + accepted_from_archive = False + for prev_attempt, prev_vr in cached.items(): + if prev_attempt == attempt_number: + continue + if prev_vr.passed: + logger.info( + f"Record {oid} timed out {timeout_attempt_counts[oid]} times, " + f"restoring attempt {prev_attempt} which passed LLM validation" + ) + # Restore the archived attempt directory + archive_dir = self.output_dir / "records" / f"{oid}_failed_attempt_{prev_attempt}" + record_dir = self.output_dir / "records" / oid + if archive_dir.exists(): + # Move current attempt out of the way, restore the passing one + if record_dir.exists(): + shutil.move( + str(record_dir), + str(self.output_dir / "records" / f"{oid}_failed_attempt_{attempt_number}"), + ) + shutil.move(str(archive_dir), str(record_dir)) + self._accept_timeout_record( + oid, + failed_this_attempt, + finished_ids, + newly_timeout_accepted, + metrics_runner, + metrics_background_tasks, + ) + accepted_from_archive = True + break + + if not accepted_from_archive: + logger.info( + f"Record {oid} timed out {timeout_attempt_counts[oid]} times, " + f"no attempt passed LLM validation — staying pending" + ) + + if newly_timeout_accepted: + timeout_accepted_ids.update(newly_timeout_accepted) + logger.info(f"{len(newly_timeout_accepted)} timeout records accepted via gate bypass") + pending_output_ids = failed_this_attempt if not pending_output_ids: @@ -415,6 +502,7 @@ async def _run_and_pipeline( "total_attempts": attempt_number, "failed_record_ids": sorted(final_failed_ids), "successful_record_ids": sorted(successful_ids), + "timeout_accepted_record_ids": sorted(timeout_accepted_ids), }, "rerun_history": rerun_history, "final_failures": final_failures, @@ -1028,6 +1116,33 @@ def settings_customise_sources(cls, settings_cls, init_settings, **kwargs): runner.output_dir = run_dir # Use existing output dir, don't create new return runner + def _accept_timeout_record( + self, + oid: str, + failed_this_attempt: list[str], + finished_ids: list[str], + newly_timeout_accepted: list[str], + metrics_runner: MetricsRunner | None, + metrics_background_tasks: list[asyncio.Task], + ) -> None: + """Accept a timeout record by updating result.json and scheduling metrics.""" + failed_this_attempt.remove(oid) + finished_ids.append(oid) + newly_timeout_accepted.append(oid) + # Update result.json with timeout_accepted flag + result_path = self.output_dir / "records" / oid / "result.json" + if result_path.exists(): + with open(result_path) as f: + result_data = json.load(f) + result_data["timeout_accepted"] = True + with open(result_path, "w") as f: + json.dump(result_data, f, indent=2) + # Fire metrics if runner available + if metrics_runner is not None: + rdir = self.output_dir / "records" / oid + task = asyncio.create_task(metrics_runner.run_and_save_record(oid, rdir)) + metrics_background_tasks.append(task) + def _archive_failed_attempt(self, record_id: str, attempt_number: int) -> None: """Archive a failed attempt before rerunning. diff --git a/src/eva/orchestrator/validation_runner.py b/src/eva/orchestrator/validation_runner.py index 259b2232..88096433 100644 --- a/src/eva/orchestrator/validation_runner.py +++ b/src/eva/orchestrator/validation_runner.py @@ -39,7 +39,7 @@ def __init__( self, run_dir: Path, dataset: list[EvaluationRecord], - thresholds: dict[str, float], + thresholds: dict[str, float | int], metric_configs: dict[str, dict] | None = None, output_ids: list[str] | None = None, ): @@ -103,7 +103,7 @@ async def run_validation(self) -> dict[str, ValidationResult]: return validation_results - async def validate_one(self, output_id: str) -> ValidationResult: + async def validate_one(self, output_id: str, *, skip_gate: bool = False) -> ValidationResult: """Validate a single record inline for per-record pipelining. Runs a two-phase check matching run_validation(): @@ -118,6 +118,10 @@ async def validate_one(self, output_id: str) -> ValidationResult: Args: output_id: Record directory name (e.g. "1.2.1" or "1.2.1/trial_0"). + skip_gate: If True, bypass the conversation_valid_end gate and run only + LLM metrics. Used for timeout-accepted records where the conversation + timed out (no goodbye event) but we still want to evaluate against + thresholds. Returns: ValidationResult with pass/fail details. @@ -141,23 +145,25 @@ async def validate_one(self, output_id: str) -> ValidationResult: record_dir = self.run_dir / "records" / output_id - # Phase 1: gate metric - gate_metrics = await self._shared_gate_runner.run_and_save_record(output_id, record_dir) - rm = gate_metrics - ms = rm.metrics.get(GATE_METRIC) if rm else None - if ms is None or ms.error: - return ValidationResult(passed=False) # empty failed_metrics = "not_finished" - score = ms.normalized_score if ms.normalized_score is not None else ms.score - if score != 1.0: - return ValidationResult(passed=False) # empty failed_metrics = "not_finished" - - # Phase 2: LLM metrics (gate passed) + if not skip_gate: + # Phase 1: gate metric + gate_metrics = await self._shared_gate_runner.run_and_save_record(output_id, record_dir) + rm = gate_metrics + ms = rm.metrics.get(GATE_METRIC) if rm else None + if ms is None or ms.error: + return ValidationResult(passed=False) # empty failed_metrics = "not_finished" + score = ms.normalized_score if ms.normalized_score is not None else ms.score + if score != 1.0: + return ValidationResult(passed=False) # empty failed_metrics = "not_finished" + + # Phase 2: LLM metrics (gate passed or skipped) llm_metrics = await self._shared_llm_runner.run_and_save_record(output_id, record_dir) if llm_metrics is None: return ValidationResult(passed=False, failed_metrics=list(LLM_METRICS)) vr = self._evaluate_record(output_id, llm_metrics, LLM_METRICS) - vr.scores[GATE_METRIC] = 1.0 + if not skip_gate: + vr.scores[GATE_METRIC] = 1.0 return vr @staticmethod @@ -226,8 +232,8 @@ def _evaluate_record( details[metric_name] = metric_score.details continue - threshold = self.thresholds.get(metric_name, 1.0) - if score < threshold: + threshold = float(self.thresholds.get(metric_name, 1.0)) + if score is None or score < threshold: logger.debug( f"Record {record_id}: Metric '{metric_name}' score {score:.2f} < threshold {threshold:.2f}" ) diff --git a/tests/fixtures/metric_signatures.json b/tests/fixtures/metric_signatures.json index fc79f392..35ef0564 100644 --- a/tests/fixtures/metric_signatures.json +++ b/tests/fixtures/metric_signatures.json @@ -35,6 +35,12 @@ "source_hash": "91b71c803d77", "version": "v0.1" }, + "ConversationTimeoutMetric": { + "name": "conversation_finished_on_time", + "prompt_hash": null, + "source_hash": "5fcfcc42ae78", + "version": "v0.1" + }, "ConversationValidEndMetric": { "name": "conversation_valid_end", "prompt_hash": null, @@ -92,7 +98,7 @@ "UserBehavioralFidelityMetric": { "name": "user_behavioral_fidelity", "prompt_hash": "06477144c28e", - "source_hash": "af8144bd7731", + "source_hash": "214ede84da72", "version": "v0.1" }, "UserSpeechFidelityMetric": { diff --git a/tests/unit/models/test_config_models.py b/tests/unit/models/test_config_models.py index 0500240f..f60d3e64 100644 --- a/tests/unit/models/test_config_models.py +++ b/tests/unit/models/test_config_models.py @@ -93,7 +93,7 @@ def test_create_minimal_config(self): # run_id = timestamp + model suffix (e.g. "2024-01-15_14-30-45.123456_nova-2_gpt-5.2_sonic") assert config.run_id.endswith("nova-2_gpt-5.2_sonic") assert config.max_concurrent_conversations == 1 - assert config.conversation_timeout_seconds == 360 + assert config.conversation_timeout_seconds == 600 def test_create_full_config(self, temp_dir: Path): """Test creating a RunConfig with all options.""" @@ -519,7 +519,7 @@ def test_defaults(self): assert c.model.stt == "deepgram" assert c.model.tts == "cartesia" assert c.max_concurrent_conversations == 1 - assert c.conversation_timeout_seconds == 360 + assert c.conversation_timeout_seconds == 600 assert c.base_port == 10000 assert c.port_pool_size == 150 assert c.max_rerun_attempts == 3