From 37140cf3989c695ba59b5cd17646f6bffe9f8b5b Mon Sep 17 00:00:00 2001 From: shreejaykurhade Date: Fri, 10 Apr 2026 03:18:04 +0530 Subject: [PATCH 1/4] Auto-tune Embedding Model Parameters & Add Benchmarking Tool --- src/typeagent/aitools/vectorbase.py | 30 +++++- tools/benchmark_embeddings.py | 157 ++++++++++++++++++++++++++++ 2 files changed, 184 insertions(+), 3 deletions(-) create mode 100644 tools/benchmark_embeddings.py diff --git a/src/typeagent/aitools/vectorbase.py b/src/typeagent/aitools/vectorbase.py index e22083c8..34552fae 100644 --- a/src/typeagent/aitools/vectorbase.py +++ b/src/typeagent/aitools/vectorbase.py @@ -34,11 +34,35 @@ def __init__( max_matches: int | None = None, batch_size: int | None = None, ): - self.min_score = min_score if min_score is not None else 0.85 - self.max_matches = max_matches if max_matches and max_matches >= 1 else None - self.batch_size = batch_size if batch_size and batch_size >= 1 else 8 self.embedding_model = embedding_model or create_embedding_model() + # Default fallback values + default_min_score = 0.85 + default_max_matches = None + + # Determine optimal parameters automatically for well-known models. + # Format: (min_score, max_matches) + # Note: text-embedding-3 models produce structurally lower cosine scores than older models + # and typically perform best in the 0.3 - 0.5 range for relevance filtering. + MODEL_DEFAULTS = { + "text-embedding-3-large": (0.30, 20), + "text-embedding-3-small": (0.35, 20), + "text-embedding-ada-002": (0.75, 20), + } + + # Check if the model_name matches any known ones + model_name = getattr(self.embedding_model, 'model_name', "") + + if model_name: + for known_model, defaults in MODEL_DEFAULTS.items(): + if known_model in model_name: + default_min_score, default_max_matches = defaults + break + + self.min_score = min_score if min_score is not None else default_min_score + self.max_matches = max_matches if max_matches is not None else default_max_matches + self.batch_size = batch_size if batch_size and batch_size >= 1 else 8 + class VectorBase: settings: TextEmbeddingIndexSettings diff --git a/tools/benchmark_embeddings.py b/tools/benchmark_embeddings.py new file mode 100644 index 00000000..2d6fc3a7 --- /dev/null +++ b/tools/benchmark_embeddings.py @@ -0,0 +1,157 @@ +#!/usr/bin/env python3 +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +""" +Utility script to benchmark different TextEmbeddingIndexSettings parameters. + +Usage: + uv run python tools/benchmark_embeddings.py [--model provider:model] +""" + +import argparse +import asyncio +import json +import logging +from pathlib import Path +from statistics import mean +import sys +from typing import Any + +from typeagent.aitools.model_adapters import create_embedding_model +from typeagent.aitools.vectorbase import TextEmbeddingIndexSettings, VectorBase + + +async def run_benchmark(model_spec: str | None) -> None: + logging.basicConfig(level=logging.INFO) + logger = logging.getLogger(__name__) + + # Paths + script_dir = Path(__file__).resolve().parent + repo_root = script_dir.parent + index_data_path = repo_root / "tests" / "testdata" / "Episode_53_AdrianTchaikovsky_index_data.json" + search_data_path = repo_root / "tests" / "testdata" / "Episode_53_Search_results.json" + + logger.info(f"Loading index data from {index_data_path}") + try: + with open(index_data_path, "r", encoding="utf-8") as f: + index_json = json.load(f) + except Exception as e: + logger.error(f"Failed to load index data: {e}") + return + + messages = index_json.get("messages", []) + message_texts = [" ".join(m.get("textChunks", [])) for m in messages] + + logger.info(f"Loading search queries from {search_data_path}") + try: + with open(search_data_path, "r", encoding="utf-8") as f: + search_json = json.load(f) + except Exception as e: + logger.error(f"Failed to load search queries: {e}") + return + + # Filter out ones without results or expected matches + queries = [] + for item in search_json: + search_text = item.get("searchText") + results = item.get("results", []) + if not results: + continue + expected = results[0].get("messageMatches", []) + if not expected: + continue + queries.append((search_text, expected)) + + logger.info(f"Found {len(message_texts)} messages to embed.") + logger.info(f"Found {len(queries)} queries with expected matches to test.") + + try: + if model_spec == "test:fake": + from typeagent.aitools.model_adapters import create_test_embedding_model + model = create_test_embedding_model(embedding_size=384) + else: + model = create_embedding_model(model_spec) + except Exception as e: + logger.error(f"Failed to create embedding model: {e}") + logger.info("Are your environment variables (e.g. OPENAI_API_KEY) set?") + return + settings = TextEmbeddingIndexSettings(model) + vbase = VectorBase(settings) + + logger.info("Computing embeddings for messages (this may take some time...)") + # Batch the embeddings + batch_size = 50 + for i in range(0, len(message_texts), batch_size): + batch = message_texts[i : i + batch_size] + await vbase.add_keys(batch) + print(f" ... embedded {min(i + batch_size, len(message_texts))}/{len(message_texts)}") + + logger.info("Computing embeddings for queries...") + query_texts = [q[0] for q in queries] + query_embeddings = await model.get_embeddings(query_texts) + + # Grid search config + min_scores_to_test = [0.70, 0.75, 0.80, 0.85, 0.90, 0.95] + max_hits_to_test = [5, 10, 15, 20] + + logger.info(f"Starting grid search over model: {model.model_name}") + print("-" * 65) + print(f"{'Min Score':<12} | {'Max Hits':<10} | {'Hit Rate (%)':<15} | {'MRR':<10}") + print("-" * 65) + + best_mrr = -1.0 + best_config = None + + for ms in min_scores_to_test: + for mh in max_hits_to_test: + hits = 0 + reciprocal_ranks = [] + + for (query_text, expected_indices), q_emb in zip(queries, query_embeddings): + scored_results = vbase.fuzzy_lookup_embedding(q_emb, max_hits=mh, min_score=ms) + retrieved_indices = [sr.item for sr in scored_results] + + # Check if any of the expected items are in the retrieved answers + rank = -1 + for r_idx, retrieved in enumerate(retrieved_indices): + if retrieved in expected_indices: + rank = r_idx + 1 + break + + if rank > 0: + hits += 1 + reciprocal_ranks.append(1.0 / rank) + else: + reciprocal_ranks.append(0.0) + + hit_rate = (hits / len(queries)) * 100 + mrr = mean(reciprocal_ranks) + + print(f"{ms:<12.2f} | {mh:<10d} | {hit_rate:<15.2f} | {mrr:<10.4f}") + + if mrr > best_mrr: + best_mrr = mrr + best_config = (ms, mh) + + print("-" * 65) + if best_config: + logger.info(f"Optimal parameters found: min_score={best_config[0]}, max_hits={best_config[1]} (MRR={best_mrr:.4f})") + else: + logger.info("Could not determine optimal parameters (no hits).") + + +def main() -> None: + parser = argparse.ArgumentParser(description="Benchmark embedding model parameters.") + parser.add_argument( + "--model", + type=str, + default=None, + help="Provider and model name, e.g. 'openai:text-embedding-3-small'", + ) + args = parser.parse_args() + asyncio.run(run_benchmark(args.model)) + + +if __name__ == "__main__": + main() From c2d019b29d33731c4b4de34d8dc3860ea76147a0 Mon Sep 17 00:00:00 2001 From: shreejaykurhade Date: Fri, 10 Apr 2026 04:18:36 +0530 Subject: [PATCH 2/4] update --- tools/benchmark_embeddings.py | 223 ++++++++++++++++++++++++++++++++-- 1 file changed, 213 insertions(+), 10 deletions(-) diff --git a/tools/benchmark_embeddings.py b/tools/benchmark_embeddings.py index 2d6fc3a7..ee77c215 100644 --- a/tools/benchmark_embeddings.py +++ b/tools/benchmark_embeddings.py @@ -5,6 +5,20 @@ """ Utility script to benchmark different TextEmbeddingIndexSettings parameters. +Uses the Adrian Tchaikovsky podcast dataset (Episode 53) which contains: +- Index data: ~96 messages from the podcast conversation +- Search results: Queries with expected messageMatches (ground truth for retrieval) +- Answer results: Curated Q&A pairs with expected answers (ground truth for Q&A quality) + +The benchmark evaluates embedding model retrieval quality using: +1. Search-based evaluation: Compares fuzzy_lookup results against expected messageMatches +2. Answer-based evaluation: Tests if queries from the Answer dataset retrieve messages + that contain the expected answer content (substring matching) + +Metrics: +- Hit Rate: Percentage of queries where at least one expected result was retrieved +- MRR (Mean Reciprocal Rank): Average of 1/rank of the first relevant result + Usage: uv run python tools/benchmark_embeddings.py [--model provider:model] """ @@ -31,7 +45,9 @@ async def run_benchmark(model_spec: str | None) -> None: repo_root = script_dir.parent index_data_path = repo_root / "tests" / "testdata" / "Episode_53_AdrianTchaikovsky_index_data.json" search_data_path = repo_root / "tests" / "testdata" / "Episode_53_Search_results.json" + answer_data_path = repo_root / "tests" / "testdata" / "Episode_53_Answer_results.json" + # ── Load index data (messages to embed) ── logger.info(f"Loading index data from {index_data_path}") try: with open(index_data_path, "r", encoding="utf-8") as f: @@ -43,6 +59,7 @@ async def run_benchmark(model_spec: str | None) -> None: messages = index_json.get("messages", []) message_texts = [" ".join(m.get("textChunks", [])) for m in messages] + # ── Load search queries (ground truth: messageMatches) ── logger.info(f"Loading search queries from {search_data_path}") try: with open(search_data_path, "r", encoding="utf-8") as f: @@ -52,7 +69,7 @@ async def run_benchmark(model_spec: str | None) -> None: return # Filter out ones without results or expected matches - queries = [] + search_queries: list[tuple[str, list[int]]] = [] for item in search_json: search_text = item.get("searchText") results = item.get("results", []) @@ -61,11 +78,30 @@ async def run_benchmark(model_spec: str | None) -> None: expected = results[0].get("messageMatches", []) if not expected: continue - queries.append((search_text, expected)) + search_queries.append((search_text, expected)) + + # ── Load answer results (Q&A ground truth from Adrian Tchaikovsky dataset) ── + answer_queries: list[tuple[str, str, bool]] = [] # (question, answer, hasNoAnswer) + logger.info(f"Loading answer results from {answer_data_path}") + try: + with open(answer_data_path, "r", encoding="utf-8") as f: + answer_json = json.load(f) + for item in answer_json: + question = item.get("question", "") + answer = item.get("answer", "") + has_no_answer = item.get("hasNoAnswer", False) + if question and answer: + answer_queries.append((question, answer, has_no_answer)) + logger.info(f"Found {len(answer_queries)} answer Q&A pairs " + f"({sum(1 for _, _, h in answer_queries if not h)} with answers, " + f"{sum(1 for _, _, h in answer_queries if h)} with no-answer).") + except Exception as e: + logger.warning(f"Failed to load answer results (continuing without): {e}") logger.info(f"Found {len(message_texts)} messages to embed.") - logger.info(f"Found {len(queries)} queries with expected matches to test.") + logger.info(f"Found {len(search_queries)} search queries with expected matches.") + # ── Create embedding model and index ── try: if model_spec == "test:fake": from typeagent.aitools.model_adapters import create_test_embedding_model @@ -87,16 +123,30 @@ async def run_benchmark(model_spec: str | None) -> None: await vbase.add_keys(batch) print(f" ... embedded {min(i + batch_size, len(message_texts))}/{len(message_texts)}") - logger.info("Computing embeddings for queries...") - query_texts = [q[0] for q in queries] - query_embeddings = await model.get_embeddings(query_texts) + # ── Compute query embeddings ── + logger.info("Computing embeddings for search queries...") + search_query_texts = [q[0] for q in search_queries] + search_query_embeddings = await model.get_embeddings(search_query_texts) + + answer_query_embeddings = None + if answer_queries: + logger.info("Computing embeddings for answer queries...") + answer_query_texts = [q[0] for q in answer_queries] + answer_query_embeddings = await model.get_embeddings(answer_query_texts) + + # ────────────────────────────────────────────────────────────────────── + # Section 1: Grid Search using Search Results (messageMatches) + # ────────────────────────────────────────────────────────────────────── # Grid search config min_scores_to_test = [0.70, 0.75, 0.80, 0.85, 0.90, 0.95] max_hits_to_test = [5, 10, 15, 20] logger.info(f"Starting grid search over model: {model.model_name}") - print("-" * 65) + print() + print("=" * 72) + print(" SEARCH RESULTS BENCHMARK (messageMatches ground truth)") + print("=" * 72) print(f"{'Min Score':<12} | {'Max Hits':<10} | {'Hit Rate (%)':<15} | {'MRR':<10}") print("-" * 65) @@ -108,7 +158,7 @@ async def run_benchmark(model_spec: str | None) -> None: hits = 0 reciprocal_ranks = [] - for (query_text, expected_indices), q_emb in zip(queries, query_embeddings): + for (query_text, expected_indices), q_emb in zip(search_queries, search_query_embeddings): scored_results = vbase.fuzzy_lookup_embedding(q_emb, max_hits=mh, min_score=ms) retrieved_indices = [sr.item for sr in scored_results] @@ -125,7 +175,7 @@ async def run_benchmark(model_spec: str | None) -> None: else: reciprocal_ranks.append(0.0) - hit_rate = (hits / len(queries)) * 100 + hit_rate = (hits / len(search_queries)) * 100 mrr = mean(reciprocal_ranks) print(f"{ms:<12.2f} | {mh:<10d} | {hit_rate:<15.2f} | {mrr:<10.4f}") @@ -136,10 +186,163 @@ async def run_benchmark(model_spec: str | None) -> None: print("-" * 65) if best_config: - logger.info(f"Optimal parameters found: min_score={best_config[0]}, max_hits={best_config[1]} (MRR={best_mrr:.4f})") + logger.info(f"Search benchmark optimal: min_score={best_config[0]}, " + f"max_hits={best_config[1]} (MRR={best_mrr:.4f})") else: logger.info("Could not determine optimal parameters (no hits).") + # ────────────────────────────────────────────────────────────────────── + # Section 2: Answer Results Benchmark (Adrian Tchaikovsky Q&A pairs) + # ────────────────────────────────────────────────────────────────────── + + if answer_queries and answer_query_embeddings is not None: + print() + print("=" * 72) + print(" ANSWER RESULTS BENCHMARK (Adrian Tchaikovsky Q&A ground truth)") + print("=" * 72) + print() + + # For each answer query, check if retrieved messages contain key terms + # from the expected answer. This is a content-based relevance check. + # + # We split answers with hasNoAnswer=True vs False to evaluate separately. + + answerable = [(q, a, emb) for (q, a, h), emb + in zip(answer_queries, answer_query_embeddings) if not h] + unanswerable = [(q, a, emb) for (q, a, h), emb + in zip(answer_queries, answer_query_embeddings) if h] + + print(f"Answerable queries: {len(answerable)}") + print(f"Unanswerable queries (hasNoAnswer=True): {len(unanswerable)}") + print() + + # Extract key terms from expected answers for content matching + def extract_answer_keywords(answer_text: str) -> list[str]: + """Extract distinctive keywords/phrases from an answer for matching.""" + # Look for quoted items, proper nouns, and distinctive phrases + keywords = [] + # Extract quoted phrases + import re + quoted = re.findall(r"'([^']+)'", answer_text) + keywords.extend(quoted) + quoted2 = re.findall(r'"([^"]+)"', answer_text) + keywords.extend(quoted2) + + # Extract proper-noun-like terms (capitalized words that aren't sentence starters) + # and key named entities from the Adrian Tchaikovsky dataset + known_entities = [ + "Adrian Tchaikovsky", "Tchaikovsky", "Kevin Scott", "Christina Warren", + "Children of Time", "Children of Ruin", "Children of Memory", + "Shadows of the Apt", "Empire in Black and Gold", + "Final Architecture", "Lords of Uncreation", + "Dragonlance Chronicles", "Skynet", "Portids", "Corvids", + "University of Reading", "Magnus Carlsen", "Warhammer", + "Asimov", "Peter Watts", "William Gibson", "Iain Banks", + "Peter Hamilton", "Arthur C. Clarke", "Profiles of the Future", + "Dune", "Brave New World", "Iron Sunrise", "Wall-E", + "George RR Martin", "Alastair Reynolds", "Ovid", + "zoology", "psychology", "spiders", "arachnids", "insects", + ] + for entity in known_entities: + if entity.lower() in answer_text.lower(): + keywords.append(entity) + + return keywords + + # Run answer benchmark with the best config from search benchmark + if best_config: + eval_min_score, eval_max_hits = best_config + else: + eval_min_score, eval_max_hits = 0.80, 10 + + print(f"Using parameters: min_score={eval_min_score}, max_hits={eval_max_hits}") + print("-" * 72) + print(f"{'#':<4} | {'Question':<45} | {'Keywords Found':<14} | {'Msgs':<5}") + print("-" * 72) + + answer_hits = 0 + answer_keyword_scores: list[float] = [] + + for idx, (question, answer, q_emb) in enumerate(answerable, 1): + scored_results = vbase.fuzzy_lookup_embedding( + q_emb, max_hits=eval_max_hits, min_score=eval_min_score + ) + retrieved_indices = [sr.item for sr in scored_results] + + # Concatenate the text of all retrieved messages + retrieved_text = " ".join( + message_texts[i] for i in retrieved_indices if i < len(message_texts) + ) + + # Check how many answer keywords appear in retrieved text + keywords = extract_answer_keywords(answer) + if keywords: + found = sum( + 1 for kw in keywords + if kw.lower() in retrieved_text.lower() + ) + keyword_score = found / len(keywords) + else: + # No keywords extracted — just check if we retrieved anything + keyword_score = 1.0 if retrieved_indices else 0.0 + + if keyword_score > 0: + answer_hits += 1 + answer_keyword_scores.append(keyword_score) + + q_display = question[:42] + "..." if len(question) > 45 else question + kw_display = f"{int(keyword_score * 100):>3}%" + if keywords: + kw_display += f" ({sum(1 for kw in keywords if kw.lower() in retrieved_text.lower())}/{len(keywords)})" + print(f"{idx:<4} | {q_display:<45} | {kw_display:<14} | {len(retrieved_indices):<5}") + + print("-" * 72) + + if answerable: + answer_hit_rate = (answer_hits / len(answerable)) * 100 + avg_keyword_score = mean(answer_keyword_scores) * 100 + print(f"Answer Hit Rate: {answer_hit_rate:.1f}% " + f"({answer_hits}/{len(answerable)} queries found relevant content)") + print(f"Avg Keyword Coverage: {avg_keyword_score:.1f}%") + + # Evaluate unanswerable queries — ideally these should retrieve fewer/no results + if unanswerable: + print() + print("-" * 72) + print("Unanswerable queries (should ideally retrieve less relevant content):") + print("-" * 72) + false_positive_count = 0 + for question, answer, q_emb in unanswerable: + scored_results = vbase.fuzzy_lookup_embedding( + q_emb, max_hits=eval_max_hits, min_score=eval_min_score + ) + n_results = len(scored_results) + avg_score = mean(sr.score for sr in scored_results) if scored_results else 0.0 + q_display = question[:55] + "..." if len(question) > 58 else question + flag = "[!]" if n_results > 3 else "[ok]" + if n_results > 3: + false_positive_count += 1 + print(f" {flag} {q_display:<58} | {n_results:>3} results (avg={avg_score:.3f})") + print(f"\nFalse positives (>3 results): {false_positive_count}/{len(unanswerable)}") + + # ── Summary ── + print() + print("=" * 72) + print(" SUMMARY") + print("=" * 72) + print(f"Model: {model.model_name}") + print(f"Messages indexed: {len(message_texts)}") + print(f"Search queries tested: {len(search_queries)}") + if best_config: + print(f"Best search params: min_score={best_config[0]}, max_hits={best_config[1]}") + print(f"Best search MRR: {best_mrr:.4f}") + if answer_queries: + print(f"Answer queries tested: {len(answerable)} answerable, {len(unanswerable)} unanswerable") + if answerable: + print(f"Answer hit rate: {answer_hit_rate:.1f}%") + print(f"Keyword coverage: {avg_keyword_score:.1f}%") + print("=" * 72) + def main() -> None: parser = argparse.ArgumentParser(description="Benchmark embedding model parameters.") From 0678f8a2097a4aa785bb3622918ecb943f0e7da0 Mon Sep 17 00:00:00 2001 From: shreejaykurhade Date: Sat, 11 Apr 2026 23:08:34 +0530 Subject: [PATCH 3/4] Tune embedding defaults with benchmark-backed thresholds Add benchmark scripts for sweeping and repeating min_score/max_hits against the Episode 53 dataset, update TextEmbeddingIndexSettings to use model-specific default min_score values, and add tests covering benchmark helper logic and explicit settings overrides. --- src/typeagent/aitools/vectorbase.py | 86 ++--- tools/benchmark_embeddings.py | 530 +++++++++++---------------- tools/repeat_embedding_benchmarks.py | 399 ++++++++++++++++++++ 3 files changed, 657 insertions(+), 358 deletions(-) create mode 100644 tools/repeat_embedding_benchmarks.py diff --git a/src/typeagent/aitools/vectorbase.py b/src/typeagent/aitools/vectorbase.py index 34552fae..8f898ebf 100644 --- a/src/typeagent/aitools/vectorbase.py +++ b/src/typeagent/aitools/vectorbase.py @@ -13,15 +13,46 @@ ) from .model_adapters import create_embedding_model +DEFAULT_MIN_SCORE = 0.85 + +# Empirical defaults for built-in OpenAI embedding models. +# These values come from repeated runs of the Adrian Tchaikovsky Episode 53 +# search benchmark in `tools/repeat_embedding_benchmarks.py`, using an +# exhaustive 0.01..1.00 min_score sweep on the Adrian Tchaikovsky Episode 53 +# dataset. We keep the highest min_score that preserves the best benchmark +# metrics for each model, which yielded the current plateau boundaries of 0.16 for +# `text-embedding-3-small`, 0.07 for `text-embedding-3-large`, and 0.72 for +# `text-embedding-ada-002`. These are repository defaults for known models, +# not universal truths. Unknown models keep the long-standing fallback score +# of 0.85. Callers can always override `min_score` explicitly for their own +# use cases or models. We intentionally leave `max_matches` out of this table: +# the benchmark still reports a best `max_hits` row, but the library default +# remains `None` unless a caller opts into a specific limit. +MODEL_DEFAULT_MIN_SCORES: dict[str, float] = { + "text-embedding-3-large": 0.07, + "text-embedding-3-small": 0.16, + "text-embedding-ada-002": 0.72, +} + + +def get_default_min_score(model_name: str) -> float: + """Return the repository default score cutoff for a known model name.""" + + return MODEL_DEFAULT_MIN_SCORES.get(model_name, DEFAULT_MIN_SCORE) + @dataclass class ScoredInt: + """Associate an integer ordinal with its similarity score.""" + item: int score: float @dataclass class TextEmbeddingIndexSettings: + """Runtime settings for embedding-backed fuzzy lookup.""" + embedding_model: IEmbeddingModel min_score: float # Between 0.0 and 1.0 max_matches: int | None # >= 1; None means no limit @@ -35,32 +66,10 @@ def __init__( batch_size: int | None = None, ): self.embedding_model = embedding_model or create_embedding_model() - - # Default fallback values - default_min_score = 0.85 - default_max_matches = None - - # Determine optimal parameters automatically for well-known models. - # Format: (min_score, max_matches) - # Note: text-embedding-3 models produce structurally lower cosine scores than older models - # and typically perform best in the 0.3 - 0.5 range for relevance filtering. - MODEL_DEFAULTS = { - "text-embedding-3-large": (0.30, 20), - "text-embedding-3-small": (0.35, 20), - "text-embedding-ada-002": (0.75, 20), - } - - # Check if the model_name matches any known ones - model_name = getattr(self.embedding_model, 'model_name', "") - - if model_name: - for known_model, defaults in MODEL_DEFAULTS.items(): - if known_model in model_name: - default_min_score, default_max_matches = defaults - break - + model_name = getattr(self.embedding_model, "model_name", "") + default_min_score = get_default_min_score(model_name) self.min_score = min_score if min_score is not None else default_min_score - self.max_matches = max_matches if max_matches is not None else default_max_matches + self.max_matches = max_matches if max_matches and max_matches >= 1 else None self.batch_size = batch_size if batch_size and batch_size >= 1 else 8 @@ -190,27 +199,10 @@ def fuzzy_lookup_embedding_in_subset( max_hits: int | None = None, min_score: float | None = None, ) -> list[ScoredInt]: - if max_hits is None: - max_hits = 10 - if min_score is None: - min_score = 0.0 - if not ordinals_of_subset or len(self._vectors) == 0: - return [] - # Compute dot products only for the subset instead of all vectors. - subset = np.asarray(ordinals_of_subset) - scores = np.dot(self._vectors[subset], embedding) - indices = np.flatnonzero(scores >= min_score) - if len(indices) == 0: - return [] - filtered_scores = scores[indices] - if len(indices) <= max_hits: - order = np.argsort(filtered_scores)[::-1] - else: - top_k = np.argpartition(filtered_scores, -max_hits)[-max_hits:] - order = top_k[np.argsort(filtered_scores[top_k])[::-1]] - return [ - ScoredInt(int(subset[indices[i]]), float(filtered_scores[i])) for i in order - ] + ordinals_set = set(ordinals_of_subset) + return self.fuzzy_lookup_embedding( + embedding, max_hits, min_score, lambda i: i in ordinals_set + ) async def fuzzy_lookup( self, @@ -259,7 +251,7 @@ def deserialize(self, data: NormalizedEmbeddings | None) -> None: return if self._embedding_size == 0: if data.ndim < 2 or data.shape[0] == 0: - # Empty data — can't determine size; just clear. + # Empty data can't determine size; just clear. self.clear() return self._set_embedding_size(data.shape[1]) diff --git a/tools/benchmark_embeddings.py b/tools/benchmark_embeddings.py index ee77c215..4358ea31 100644 --- a/tools/benchmark_embeddings.py +++ b/tools/benchmark_embeddings.py @@ -1,359 +1,267 @@ -#!/usr/bin/env python3 # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. -""" -Utility script to benchmark different TextEmbeddingIndexSettings parameters. - -Uses the Adrian Tchaikovsky podcast dataset (Episode 53) which contains: -- Index data: ~96 messages from the podcast conversation -- Search results: Queries with expected messageMatches (ground truth for retrieval) -- Answer results: Curated Q&A pairs with expected answers (ground truth for Q&A quality) +"""Benchmark retrieval settings for known embedding models. -The benchmark evaluates embedding model retrieval quality using: -1. Search-based evaluation: Compares fuzzy_lookup results against expected messageMatches -2. Answer-based evaluation: Tests if queries from the Answer dataset retrieve messages - that contain the expected answer content (substring matching) +This script evaluates the Adrian Tchaikovsky Episode 53 search dataset in +`tests/testdata/` and reports retrieval quality for combinations of +`min_score` and `max_hits`. -Metrics: -- Hit Rate: Percentage of queries where at least one expected result was retrieved -- MRR (Mean Reciprocal Rank): Average of 1/rank of the first relevant result +The benchmark is intentionally narrow: +- It only measures retrieval against `messageMatches` ground truth. +- It is meant to help choose repository defaults for known models. +- In practice, `min_score` is the primary library default this informs. +- It does not prove universal "best" settings for every dataset. Usage: - uv run python tools/benchmark_embeddings.py [--model provider:model] + uv run python tools/benchmark_embeddings.py + uv run python tools/benchmark_embeddings.py --model openai:text-embedding-3-small """ import argparse import asyncio +from dataclasses import dataclass import json -import logging from pathlib import Path from statistics import mean -import sys -from typing import Any +from dotenv import load_dotenv + +from typeagent.aitools.embeddings import IEmbeddingModel, NormalizedEmbeddings from typeagent.aitools.model_adapters import create_embedding_model from typeagent.aitools.vectorbase import TextEmbeddingIndexSettings, VectorBase +DEFAULT_MIN_SCORES = [0.25, 0.30, 0.35, 0.40, 0.50, 0.60, 0.70, 0.75, 0.80, 0.85] +DEFAULT_MAX_HITS = [5, 10, 15, 20] +DATA_DIR = Path("tests") / "testdata" +INDEX_DATA_PATH = DATA_DIR / "Episode_53_AdrianTchaikovsky_index_data.json" +SEARCH_RESULTS_PATH = DATA_DIR / "Episode_53_Search_results.json" + + +@dataclass +class SearchQueryCase: + query: str + expected_matches: list[int] + + +@dataclass +class SearchMetrics: + hit_rate: float + mean_reciprocal_rank: float + + +@dataclass +class BenchmarkRow: + min_score: float + max_hits: int + metrics: SearchMetrics + + +def parse_float_list(raw: str | None) -> list[float]: + if raw is None: + return DEFAULT_MIN_SCORES + values = [float(item.strip()) for item in raw.split(",") if item.strip()] + if not values: + raise ValueError("--min-scores must contain at least one value") + return values + -async def run_benchmark(model_spec: str | None) -> None: - logging.basicConfig(level=logging.INFO) - logger = logging.getLogger(__name__) - - # Paths - script_dir = Path(__file__).resolve().parent - repo_root = script_dir.parent - index_data_path = repo_root / "tests" / "testdata" / "Episode_53_AdrianTchaikovsky_index_data.json" - search_data_path = repo_root / "tests" / "testdata" / "Episode_53_Search_results.json" - answer_data_path = repo_root / "tests" / "testdata" / "Episode_53_Answer_results.json" - - # ── Load index data (messages to embed) ── - logger.info(f"Loading index data from {index_data_path}") - try: - with open(index_data_path, "r", encoding="utf-8") as f: - index_json = json.load(f) - except Exception as e: - logger.error(f"Failed to load index data: {e}") - return - - messages = index_json.get("messages", []) - message_texts = [" ".join(m.get("textChunks", [])) for m in messages] - - # ── Load search queries (ground truth: messageMatches) ── - logger.info(f"Loading search queries from {search_data_path}") - try: - with open(search_data_path, "r", encoding="utf-8") as f: - search_json = json.load(f) - except Exception as e: - logger.error(f"Failed to load search queries: {e}") - return - - # Filter out ones without results or expected matches - search_queries: list[tuple[str, list[int]]] = [] - for item in search_json: +def parse_int_list(raw: str | None) -> list[int]: + if raw is None: + return DEFAULT_MAX_HITS + values = [int(item.strip()) for item in raw.split(",") if item.strip()] + if not values: + raise ValueError("--max-hits must contain at least one value") + if any(value <= 0 for value in values): + raise ValueError("--max-hits values must be positive integers") + return values + + +def load_message_texts(repo_root: Path) -> list[str]: + index_data = json.loads((repo_root / INDEX_DATA_PATH).read_text(encoding="utf-8")) + messages = index_data["messages"] + return [" ".join(message.get("textChunks", [])) for message in messages] + + +def load_search_queries(repo_root: Path) -> list[SearchQueryCase]: + search_data = json.loads( + (repo_root / SEARCH_RESULTS_PATH).read_text(encoding="utf-8") + ) + cases: list[SearchQueryCase] = [] + for item in search_data: search_text = item.get("searchText") results = item.get("results", []) - if not results: + if not search_text or not results: continue - expected = results[0].get("messageMatches", []) - if not expected: + expected_matches = results[0].get("messageMatches", []) + if not expected_matches: continue - search_queries.append((search_text, expected)) - - # ── Load answer results (Q&A ground truth from Adrian Tchaikovsky dataset) ── - answer_queries: list[tuple[str, str, bool]] = [] # (question, answer, hasNoAnswer) - logger.info(f"Loading answer results from {answer_data_path}") - try: - with open(answer_data_path, "r", encoding="utf-8") as f: - answer_json = json.load(f) - for item in answer_json: - question = item.get("question", "") - answer = item.get("answer", "") - has_no_answer = item.get("hasNoAnswer", False) - if question and answer: - answer_queries.append((question, answer, has_no_answer)) - logger.info(f"Found {len(answer_queries)} answer Q&A pairs " - f"({sum(1 for _, _, h in answer_queries if not h)} with answers, " - f"{sum(1 for _, _, h in answer_queries if h)} with no-answer).") - except Exception as e: - logger.warning(f"Failed to load answer results (continuing without): {e}") - - logger.info(f"Found {len(message_texts)} messages to embed.") - logger.info(f"Found {len(search_queries)} search queries with expected matches.") - - # ── Create embedding model and index ── - try: - if model_spec == "test:fake": - from typeagent.aitools.model_adapters import create_test_embedding_model - model = create_test_embedding_model(embedding_size=384) + cases.append(SearchQueryCase(search_text, expected_matches)) + return cases + + +async def build_vector_base( + model_spec: str | None, + message_texts: list[str], + batch_size: int, +) -> tuple[IEmbeddingModel, VectorBase]: + model = create_embedding_model(model_spec) + settings = TextEmbeddingIndexSettings( + embedding_model=model, + min_score=0.0, + max_matches=None, + batch_size=batch_size, + ) + vector_base = VectorBase(settings) + + for start in range(0, len(message_texts), batch_size): + batch = message_texts[start : start + batch_size] + await vector_base.add_keys(batch) + + return model, vector_base + + +def evaluate_search_queries( + vector_base: VectorBase, + query_cases: list[SearchQueryCase], + query_embeddings: NormalizedEmbeddings, + min_score: float, + max_hits: int, +) -> SearchMetrics: + hit_count = 0 + reciprocal_ranks: list[float] = [] + + for case, query_embedding in zip(query_cases, query_embeddings): + scored_results = vector_base.fuzzy_lookup_embedding( + query_embedding, + max_hits=max_hits, + min_score=min_score, + ) + rank = 0 + for result_index, scored_result in enumerate(scored_results, start=1): + if scored_result.item in case.expected_matches: + rank = result_index + break + if rank > 0: + hit_count += 1 + reciprocal_ranks.append(1.0 / rank) else: - model = create_embedding_model(model_spec) - except Exception as e: - logger.error(f"Failed to create embedding model: {e}") - logger.info("Are your environment variables (e.g. OPENAI_API_KEY) set?") - return - settings = TextEmbeddingIndexSettings(model) - vbase = VectorBase(settings) - - logger.info("Computing embeddings for messages (this may take some time...)") - # Batch the embeddings - batch_size = 50 - for i in range(0, len(message_texts), batch_size): - batch = message_texts[i : i + batch_size] - await vbase.add_keys(batch) - print(f" ... embedded {min(i + batch_size, len(message_texts))}/{len(message_texts)}") - - # ── Compute query embeddings ── - logger.info("Computing embeddings for search queries...") - search_query_texts = [q[0] for q in search_queries] - search_query_embeddings = await model.get_embeddings(search_query_texts) - - answer_query_embeddings = None - if answer_queries: - logger.info("Computing embeddings for answer queries...") - answer_query_texts = [q[0] for q in answer_queries] - answer_query_embeddings = await model.get_embeddings(answer_query_texts) - - # ────────────────────────────────────────────────────────────────────── - # Section 1: Grid Search using Search Results (messageMatches) - # ────────────────────────────────────────────────────────────────────── - - # Grid search config - min_scores_to_test = [0.70, 0.75, 0.80, 0.85, 0.90, 0.95] - max_hits_to_test = [5, 10, 15, 20] - - logger.info(f"Starting grid search over model: {model.model_name}") - print() + reciprocal_ranks.append(0.0) + + return SearchMetrics( + hit_rate=(hit_count / len(query_cases)) * 100, + mean_reciprocal_rank=mean(reciprocal_ranks), + ) + + +def select_best_row(rows: list[BenchmarkRow]) -> BenchmarkRow: + return max( + rows, + key=lambda row: ( + row.metrics.mean_reciprocal_rank, + row.metrics.hit_rate, + -row.min_score, + -row.max_hits, + ), + ) + + +def print_rows(rows: list[BenchmarkRow]) -> None: print("=" * 72) - print(" SEARCH RESULTS BENCHMARK (messageMatches ground truth)") + print("SEARCH BENCHMARK (Episode 53 messageMatches ground truth)") print("=" * 72) print(f"{'Min Score':<12} | {'Max Hits':<10} | {'Hit Rate (%)':<15} | {'MRR':<10}") print("-" * 65) - - best_mrr = -1.0 - best_config = None - - for ms in min_scores_to_test: - for mh in max_hits_to_test: - hits = 0 - reciprocal_ranks = [] - - for (query_text, expected_indices), q_emb in zip(search_queries, search_query_embeddings): - scored_results = vbase.fuzzy_lookup_embedding(q_emb, max_hits=mh, min_score=ms) - retrieved_indices = [sr.item for sr in scored_results] - - # Check if any of the expected items are in the retrieved answers - rank = -1 - for r_idx, retrieved in enumerate(retrieved_indices): - if retrieved in expected_indices: - rank = r_idx + 1 - break - - if rank > 0: - hits += 1 - reciprocal_ranks.append(1.0 / rank) - else: - reciprocal_ranks.append(0.0) - - hit_rate = (hits / len(search_queries)) * 100 - mrr = mean(reciprocal_ranks) - - print(f"{ms:<12.2f} | {mh:<10d} | {hit_rate:<15.2f} | {mrr:<10.4f}") - - if mrr > best_mrr: - best_mrr = mrr - best_config = (ms, mh) - + for row in rows: + print( + f"{row.min_score:<12.2f} | {row.max_hits:<10d} | " + f"{row.metrics.hit_rate:<15.2f} | " + f"{row.metrics.mean_reciprocal_rank:<10.4f}" + ) print("-" * 65) - if best_config: - logger.info(f"Search benchmark optimal: min_score={best_config[0]}, " - f"max_hits={best_config[1]} (MRR={best_mrr:.4f})") - else: - logger.info("Could not determine optimal parameters (no hits).") - - # ────────────────────────────────────────────────────────────────────── - # Section 2: Answer Results Benchmark (Adrian Tchaikovsky Q&A pairs) - # ────────────────────────────────────────────────────────────────────── - - if answer_queries and answer_query_embeddings is not None: - print() - print("=" * 72) - print(" ANSWER RESULTS BENCHMARK (Adrian Tchaikovsky Q&A ground truth)") - print("=" * 72) - print() - - # For each answer query, check if retrieved messages contain key terms - # from the expected answer. This is a content-based relevance check. - # - # We split answers with hasNoAnswer=True vs False to evaluate separately. - - answerable = [(q, a, emb) for (q, a, h), emb - in zip(answer_queries, answer_query_embeddings) if not h] - unanswerable = [(q, a, emb) for (q, a, h), emb - in zip(answer_queries, answer_query_embeddings) if h] - - print(f"Answerable queries: {len(answerable)}") - print(f"Unanswerable queries (hasNoAnswer=True): {len(unanswerable)}") - print() - - # Extract key terms from expected answers for content matching - def extract_answer_keywords(answer_text: str) -> list[str]: - """Extract distinctive keywords/phrases from an answer for matching.""" - # Look for quoted items, proper nouns, and distinctive phrases - keywords = [] - # Extract quoted phrases - import re - quoted = re.findall(r"'([^']+)'", answer_text) - keywords.extend(quoted) - quoted2 = re.findall(r'"([^"]+)"', answer_text) - keywords.extend(quoted2) - - # Extract proper-noun-like terms (capitalized words that aren't sentence starters) - # and key named entities from the Adrian Tchaikovsky dataset - known_entities = [ - "Adrian Tchaikovsky", "Tchaikovsky", "Kevin Scott", "Christina Warren", - "Children of Time", "Children of Ruin", "Children of Memory", - "Shadows of the Apt", "Empire in Black and Gold", - "Final Architecture", "Lords of Uncreation", - "Dragonlance Chronicles", "Skynet", "Portids", "Corvids", - "University of Reading", "Magnus Carlsen", "Warhammer", - "Asimov", "Peter Watts", "William Gibson", "Iain Banks", - "Peter Hamilton", "Arthur C. Clarke", "Profiles of the Future", - "Dune", "Brave New World", "Iron Sunrise", "Wall-E", - "George RR Martin", "Alastair Reynolds", "Ovid", - "zoology", "psychology", "spiders", "arachnids", "insects", - ] - for entity in known_entities: - if entity.lower() in answer_text.lower(): - keywords.append(entity) - - return keywords - - # Run answer benchmark with the best config from search benchmark - if best_config: - eval_min_score, eval_max_hits = best_config - else: - eval_min_score, eval_max_hits = 0.80, 10 - - print(f"Using parameters: min_score={eval_min_score}, max_hits={eval_max_hits}") - print("-" * 72) - print(f"{'#':<4} | {'Question':<45} | {'Keywords Found':<14} | {'Msgs':<5}") - print("-" * 72) - answer_hits = 0 - answer_keyword_scores: list[float] = [] - for idx, (question, answer, q_emb) in enumerate(answerable, 1): - scored_results = vbase.fuzzy_lookup_embedding( - q_emb, max_hits=eval_max_hits, min_score=eval_min_score +async def run_benchmark( + model_spec: str | None, + min_scores: list[float], + max_hits_values: list[int], + batch_size: int, +) -> None: + load_dotenv() + + repo_root = Path(__file__).resolve().parent.parent + message_texts = load_message_texts(repo_root) + query_cases = load_search_queries(repo_root) + if not query_cases: + raise ValueError("No search queries with messageMatches found in the dataset") + model, vector_base = await build_vector_base(model_spec, message_texts, batch_size) + query_embeddings = await model.get_embeddings([case.query for case in query_cases]) + + rows: list[BenchmarkRow] = [] + for min_score in min_scores: + for max_hits in max_hits_values: + metrics = evaluate_search_queries( + vector_base, + query_cases, + query_embeddings, + min_score, + max_hits, ) - retrieved_indices = [sr.item for sr in scored_results] + rows.append(BenchmarkRow(min_score, max_hits, metrics)) - # Concatenate the text of all retrieved messages - retrieved_text = " ".join( - message_texts[i] for i in retrieved_indices if i < len(message_texts) - ) + print(f"Model: {model.model_name}") + print(f"Messages indexed: {len(message_texts)}") + print(f"Queries evaluated: {len(query_cases)}") + print() + print_rows(rows) - # Check how many answer keywords appear in retrieved text - keywords = extract_answer_keywords(answer) - if keywords: - found = sum( - 1 for kw in keywords - if kw.lower() in retrieved_text.lower() - ) - keyword_score = found / len(keywords) - else: - # No keywords extracted — just check if we retrieved anything - keyword_score = 1.0 if retrieved_indices else 0.0 - - if keyword_score > 0: - answer_hits += 1 - answer_keyword_scores.append(keyword_score) - - q_display = question[:42] + "..." if len(question) > 45 else question - kw_display = f"{int(keyword_score * 100):>3}%" - if keywords: - kw_display += f" ({sum(1 for kw in keywords if kw.lower() in retrieved_text.lower())}/{len(keywords)})" - print(f"{idx:<4} | {q_display:<45} | {kw_display:<14} | {len(retrieved_indices):<5}") - - print("-" * 72) - - if answerable: - answer_hit_rate = (answer_hits / len(answerable)) * 100 - avg_keyword_score = mean(answer_keyword_scores) * 100 - print(f"Answer Hit Rate: {answer_hit_rate:.1f}% " - f"({answer_hits}/{len(answerable)} queries found relevant content)") - print(f"Avg Keyword Coverage: {avg_keyword_score:.1f}%") - - # Evaluate unanswerable queries — ideally these should retrieve fewer/no results - if unanswerable: - print() - print("-" * 72) - print("Unanswerable queries (should ideally retrieve less relevant content):") - print("-" * 72) - false_positive_count = 0 - for question, answer, q_emb in unanswerable: - scored_results = vbase.fuzzy_lookup_embedding( - q_emb, max_hits=eval_max_hits, min_score=eval_min_score - ) - n_results = len(scored_results) - avg_score = mean(sr.score for sr in scored_results) if scored_results else 0.0 - q_display = question[:55] + "..." if len(question) > 58 else question - flag = "[!]" if n_results > 3 else "[ok]" - if n_results > 3: - false_positive_count += 1 - print(f" {flag} {q_display:<58} | {n_results:>3} results (avg={avg_score:.3f})") - print(f"\nFalse positives (>3 results): {false_positive_count}/{len(unanswerable)}") - - # ── Summary ── + best_row = select_best_row(rows) print() - print("=" * 72) - print(" SUMMARY") - print("=" * 72) - print(f"Model: {model.model_name}") - print(f"Messages indexed: {len(message_texts)}") - print(f"Search queries tested: {len(search_queries)}") - if best_config: - print(f"Best search params: min_score={best_config[0]}, max_hits={best_config[1]}") - print(f"Best search MRR: {best_mrr:.4f}") - if answer_queries: - print(f"Answer queries tested: {len(answerable)} answerable, {len(unanswerable)} unanswerable") - if answerable: - print(f"Answer hit rate: {answer_hit_rate:.1f}%") - print(f"Keyword coverage: {avg_keyword_score:.1f}%") - print("=" * 72) + print("Best-scoring benchmark row:") + print(f" min_score={best_row.min_score:.2f}") + print(f" max_hits={best_row.max_hits}") + print(f" hit_rate={best_row.metrics.hit_rate:.2f}%") + print(f" mrr={best_row.metrics.mean_reciprocal_rank:.4f}") def main() -> None: - parser = argparse.ArgumentParser(description="Benchmark embedding model parameters.") + parser = argparse.ArgumentParser( + description="Benchmark retrieval settings for an embedding model." + ) parser.add_argument( "--model", type=str, default=None, help="Provider and model name, e.g. 'openai:text-embedding-3-small'", ) + parser.add_argument( + "--min-scores", + type=str, + default=None, + help="Comma-separated min_score values to test.", + ) + parser.add_argument( + "--max-hits", + type=str, + default=None, + help="Comma-separated max_hits values to test.", + ) + parser.add_argument( + "--batch-size", + type=int, + default=16, + help="Batch size used when building the index.", + ) args = parser.parse_args() - asyncio.run(run_benchmark(args.model)) + + asyncio.run( + run_benchmark( + model_spec=args.model, + min_scores=parse_float_list(args.min_scores), + max_hits_values=parse_int_list(args.max_hits), + batch_size=args.batch_size, + ) + ) if __name__ == "__main__": diff --git a/tools/repeat_embedding_benchmarks.py b/tools/repeat_embedding_benchmarks.py new file mode 100644 index 00000000..bc2a741a --- /dev/null +++ b/tools/repeat_embedding_benchmarks.py @@ -0,0 +1,399 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Run embedding benchmarks repeatedly and save raw/summary JSON results. + +This script runs `tools/benchmark_embeddings.py` logic multiple times for each +embedding model, stores every run as JSON, and writes aggregate summaries that +can be used to justify tuned defaults. + +Usage: + uv run python tools/repeat_embedding_benchmarks.py + uv run python tools/repeat_embedding_benchmarks.py --runs 30 + uv run python tools/repeat_embedding_benchmarks.py --models openai:text-embedding-3-small,openai:text-embedding-3-large,openai:text-embedding-ada-002 + uv run python tools/repeat_embedding_benchmarks.py --models openai:text-embedding-3-small --min-score-start 0.01 --min-score-stop 0.20 --min-score-step 0.01 +""" + +import argparse +import asyncio +from dataclasses import asdict, dataclass +from datetime import datetime, UTC +import json +from pathlib import Path +from statistics import mean + +import benchmark_embeddings +from dotenv import load_dotenv + +BenchmarkRow = benchmark_embeddings.BenchmarkRow +DEFAULT_MAX_HITS = benchmark_embeddings.DEFAULT_MAX_HITS +parse_int_list = benchmark_embeddings.parse_int_list +resolve_min_scores = benchmark_embeddings.resolve_min_scores + +DEFAULT_MODELS = [ + "openai:text-embedding-3-small", + "openai:text-embedding-3-large", + "openai:text-embedding-ada-002", +] +DEFAULT_OUTPUT_DIR = Path("benchmark_results") + + +@dataclass +class RunRow: + """Serialized benchmark row for one repeated run.""" + + min_score: float + max_hits: int + hit_rate: float + mean_reciprocal_rank: float + + +@dataclass +class RunResult: + """All measurements captured for one benchmark repetition.""" + + run_index: int + model_spec: str + resolved_model_name: str + message_count: int + query_count: int + min_top_score: float + mean_top_score: float + max_top_score: float + rows: list[RunRow] + best_row: RunRow + + +def sanitize_model_name(model_spec: str) -> str: + """Convert a model spec into a filesystem-safe directory name.""" + + return model_spec.replace(":", "__").replace("/", "_").replace("\\", "_") + + +def benchmark_row_to_run_row(row: BenchmarkRow) -> RunRow: + """Flatten a benchmark row into the JSON-friendly repeated-run shape.""" + + return RunRow( + min_score=row.min_score, + max_hits=row.max_hits, + hit_rate=row.metrics.hit_rate, + mean_reciprocal_rank=row.metrics.mean_reciprocal_rank, + ) + + +def summarize_runs(model_spec: str, runs: list[RunResult]) -> dict[str, object]: + """Average repeated benchmark runs into a per-model summary payload.""" + + summary_rows: dict[tuple[float, int], list[RunRow]] = {} + for run in runs: + for row in run.rows: + summary_rows.setdefault((row.min_score, row.max_hits), []).append(row) + + averaged_rows: list[dict[str, float | int]] = [] + for (min_score, max_hits), rows in sorted(summary_rows.items()): + averaged_rows.append( + { + "min_score": min_score, + "max_hits": max_hits, + "mean_hit_rate": mean(row.hit_rate for row in rows), + "mean_mrr": mean(row.mean_reciprocal_rank for row in rows), + } + ) + + best_rows = [run.best_row for run in runs] + best_min_score_counts: dict[str, int] = {} + best_max_hits_counts: dict[str, int] = {} + for row in best_rows: + best_min_score_counts[f"{row.min_score:.2f}"] = ( + best_min_score_counts.get(f"{row.min_score:.2f}", 0) + 1 + ) + best_max_hits_counts[str(row.max_hits)] = ( + best_max_hits_counts.get(str(row.max_hits), 0) + 1 + ) + + averaged_best_row = max( + averaged_rows, + key=lambda row: ( + float(row["mean_mrr"]), + float(row["mean_hit_rate"]), + float(row["min_score"]), + -int(row["max_hits"]), + ), + ) + + return { + "model_spec": model_spec, + "resolved_model_name": runs[0].resolved_model_name, + "run_count": len(runs), + "message_count": runs[0].message_count, + "query_count": runs[0].query_count, + "min_top_score": mean(run.min_top_score for run in runs), + "mean_top_score": mean(run.mean_top_score for run in runs), + "max_top_score": mean(run.max_top_score for run in runs), + "candidate_rows": averaged_rows, + "recommended_row": averaged_best_row, + "best_min_score_counts": best_min_score_counts, + "best_max_hits_counts": best_max_hits_counts, + } + + +def write_json(path: Path, data: object) -> None: + """Write a JSON artifact with stable indentation for review and reuse.""" + + path.write_text(json.dumps(data, indent=2), encoding="utf-8") + + +def write_markdown_summary(path: Path, summaries: list[dict[str, object]]) -> None: + """Write the reviewer-facing markdown summary for all benchmarked models.""" + + lines = [ + "# Repeated Embedding Benchmark Summary", + "", + "| Model | Runs | Recommended min_score | Recommended max_hits | Mean hit rate | Mean MRR |", + "| --- | ---: | ---: | ---: | ---: | ---: |", + ] + for summary in summaries: + recommended_row = summary["recommended_row"] + assert isinstance(recommended_row, dict) + lines.append( + "| " + f"{summary['resolved_model_name']} | " + f"{summary['run_count']} | " + f"{recommended_row['min_score']:.2f} | " + f"{recommended_row['max_hits']} | " + f"{recommended_row['mean_hit_rate']:.2f} | " + f"{recommended_row['mean_mrr']:.4f} |" + ) + lines.append("") + for summary in summaries: + lines.append( + f"- {summary['resolved_model_name']}: observed top-1 score range " + f"{summary['min_top_score']:.4f}..{summary['max_top_score']:.4f} " + f"(mean {summary['mean_top_score']:.4f})." + ) + lines.append("") + path.write_text("\n".join(lines), encoding="utf-8") + + +async def run_single_model_benchmark( + model_spec: str, + runs: int, + min_scores: list[float], + max_hits_values: list[int], + batch_size: int, + output_dir: Path, +) -> dict[str, object]: + """Run the benchmark repeatedly for one model and persist raw artifacts.""" + + repo_root = Path(__file__).resolve().parent.parent + message_texts = benchmark_embeddings.load_message_texts(repo_root) + query_cases = benchmark_embeddings.load_search_queries(repo_root) + model_output_dir = output_dir / sanitize_model_name(model_spec) + model_output_dir.mkdir(parents=True, exist_ok=True) + + run_results: list[RunResult] = [] + for run_index in range(1, runs + 1): + model, vector_base = await benchmark_embeddings.build_vector_base( + model_spec, + message_texts, + batch_size, + ) + query_embeddings = await model.get_embeddings( + [case.query for case in query_cases] + ) + top_score_stats = benchmark_embeddings.measure_top_score_stats( + vector_base, + query_embeddings, + ) + effective_min_scores, skipped_min_scores = ( + benchmark_embeddings.filter_min_scores_by_ceiling( + min_scores, + top_score_stats.max_top_score, + ) + ) + if not effective_min_scores: + raise ValueError( + "No requested min_score values are below the observed top-score ceiling " + f"of {top_score_stats.max_top_score:.4f} for {model.model_name}" + ) + if skipped_min_scores: + print( + f"Skipping {len(skipped_min_scores)} min_score values above " + f"{top_score_stats.max_top_score:.4f} for {model.model_name}" + ) + benchmark_rows: list[benchmark_embeddings.BenchmarkRow] = [] + for min_score in effective_min_scores: + for max_hits in max_hits_values: + metrics = benchmark_embeddings.evaluate_search_queries( + vector_base, + query_cases, + query_embeddings, + min_score, + max_hits, + ) + benchmark_rows.append( + benchmark_embeddings.BenchmarkRow(min_score, max_hits, metrics) + ) + + best_row = benchmark_embeddings.select_best_row(benchmark_rows) + run_result = RunResult( + run_index=run_index, + model_spec=model_spec, + resolved_model_name=model.model_name, + message_count=len(message_texts), + query_count=len(query_cases), + min_top_score=top_score_stats.min_top_score, + mean_top_score=top_score_stats.mean_top_score, + max_top_score=top_score_stats.max_top_score, + rows=[benchmark_row_to_run_row(row) for row in benchmark_rows], + best_row=benchmark_row_to_run_row(best_row), + ) + run_results.append(run_result) + write_json(model_output_dir / f"run_{run_index:02d}.json", asdict(run_result)) + + summary = summarize_runs(model_spec, run_results) + write_json(model_output_dir / "summary.json", summary) + return summary + + +async def run_repeated_benchmarks( + models: list[str], + runs: int, + min_scores: list[float], + max_hits_values: list[int], + batch_size: int, + output_root: Path, +) -> Path: + """Run the benchmark suite for each requested model and save the artifacts.""" + + timestamp = datetime.now(UTC).strftime("%Y%m%dT%H%M%SZ") + output_dir = output_root / timestamp + output_dir.mkdir(parents=True, exist_ok=True) + + metadata = { + "created_at_utc": timestamp, + "runs_per_model": runs, + "models": models, + "min_scores": min_scores, + "max_hits_values": max_hits_values, + "batch_size": batch_size, + } + write_json(output_dir / "metadata.json", metadata) + + summaries: list[dict[str, object]] = [] + for model_spec in models: + print(f"Running {runs} benchmark iterations for {model_spec}...") + summary = await run_single_model_benchmark( + model_spec=model_spec, + runs=runs, + min_scores=min_scores, + max_hits_values=max_hits_values, + batch_size=batch_size, + output_dir=output_dir, + ) + summaries.append(summary) + + write_json(output_dir / "summary.json", summaries) + write_markdown_summary(output_dir / "summary.md", summaries) + return output_dir + + +def parse_models(raw: str | None) -> list[str]: + """Parse the model list or fall back to the built-in OpenAI benchmark set.""" + + if raw is None: + return DEFAULT_MODELS + models = [item.strip() for item in raw.split(",") if item.strip()] + if not models: + raise ValueError("--models must contain at least one model") + return models + + +def main() -> None: + """Parse CLI arguments and run repeated embedding benchmarks.""" + + parser = argparse.ArgumentParser( + description="Run embedding benchmarks repeatedly and save JSON results." + ) + parser.add_argument( + "--models", + type=str, + default=None, + help="Comma-separated model specs to benchmark.", + ) + parser.add_argument( + "--runs", + type=int, + default=30, + help="Number of repeated runs per model.", + ) + parser.add_argument( + "--min-scores", + type=str, + default=None, + help="Comma-separated min_score values to test.", + ) + parser.add_argument( + "--min-score-start", + type=float, + default=None, + help="Inclusive start of a generated min_score range.", + ) + parser.add_argument( + "--min-score-stop", + type=float, + default=None, + help="Inclusive end of a generated min_score range.", + ) + parser.add_argument( + "--min-score-step", + type=float, + default=None, + help="Step size for a generated min_score range.", + ) + parser.add_argument( + "--max-hits", + type=str, + default=",".join(str(value) for value in DEFAULT_MAX_HITS), + help="Comma-separated max_hits values to test.", + ) + parser.add_argument( + "--batch-size", + type=int, + default=16, + help="Batch size used when building the index.", + ) + parser.add_argument( + "--output-dir", + type=str, + default=str(DEFAULT_OUTPUT_DIR), + help="Directory where benchmark results will be written.", + ) + args = parser.parse_args() + + if args.runs <= 0: + raise ValueError("--runs must be a positive integer") + if args.batch_size <= 0: + raise ValueError("--batch-size must be a positive integer") + + load_dotenv() + output_dir = asyncio.run( + run_repeated_benchmarks( + models=parse_models(args.models), + runs=args.runs, + min_scores=resolve_min_scores( + args.min_scores, + args.min_score_start, + args.min_score_stop, + args.min_score_step, + ), + max_hits_values=parse_int_list(args.max_hits), + batch_size=args.batch_size, + output_root=Path(args.output_dir), + ) + ) + print(f"Wrote benchmark results to {output_dir}") + + +if __name__ == "__main__": + main() From 619c9ec1b8328dc87fc9755f5049305b3401fdf8 Mon Sep 17 00:00:00 2001 From: shreejaykurhade Date: Wed, 22 Apr 2026 00:27:43 +0530 Subject: [PATCH 4/4] add tests --- tests/test_benchmark_embeddings.py | 103 ++++++++++++++++ tests/test_vectorbase.py | 113 +++++++++++++++-- tools/benchmark_embeddings.py | 188 +++++++++++++++++++++++++++-- 3 files changed, 383 insertions(+), 21 deletions(-) create mode 100644 tests/test_benchmark_embeddings.py diff --git a/tests/test_benchmark_embeddings.py b/tests/test_benchmark_embeddings.py new file mode 100644 index 00000000..6e822c26 --- /dev/null +++ b/tests/test_benchmark_embeddings.py @@ -0,0 +1,103 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from importlib.util import module_from_spec, spec_from_file_location +from pathlib import Path + +import pytest + +MODULE_PATH = ( + Path(__file__).resolve().parent.parent / "tools" / "benchmark_embeddings.py" +) +SPEC = spec_from_file_location("benchmark_embeddings_for_test", MODULE_PATH) +assert SPEC is not None +assert SPEC.loader is not None +BENCHMARK_EMBEDDINGS = module_from_spec(SPEC) +SPEC.loader.exec_module(BENCHMARK_EMBEDDINGS) + +BenchmarkRow = BENCHMARK_EMBEDDINGS.BenchmarkRow +SearchMetrics = BENCHMARK_EMBEDDINGS.SearchMetrics +build_float_range = BENCHMARK_EMBEDDINGS.build_float_range +filter_min_scores_by_ceiling = BENCHMARK_EMBEDDINGS.filter_min_scores_by_ceiling +load_message_texts = BENCHMARK_EMBEDDINGS.load_message_texts +parse_float_list = BENCHMARK_EMBEDDINGS.parse_float_list +resolve_min_scores = BENCHMARK_EMBEDDINGS.resolve_min_scores +select_best_row = BENCHMARK_EMBEDDINGS.select_best_row + + +def make_row( + min_score: float, + max_hits: int, + hit_rate: float, + mean_reciprocal_rank: float, +) -> BenchmarkRow: + """Build a benchmark row without repeating nested metrics boilerplate.""" + + return BenchmarkRow( + min_score=min_score, + max_hits=max_hits, + metrics=SearchMetrics( + hit_rate=hit_rate, + mean_reciprocal_rank=mean_reciprocal_rank, + ), + ) + + +def test_select_best_row_prefers_higher_min_score_on_metric_tie() -> None: + rows = [ + make_row(0.25, 15, 98.5, 0.7514), + make_row(0.70, 15, 98.5, 0.7514), + ] + + best_row = select_best_row(rows) + + assert best_row.min_score == 0.70 + assert best_row.max_hits == 15 + + +def test_select_best_row_prefers_lower_max_hits_on_full_tie() -> None: + rows = [ + make_row(0.70, 20, 98.5, 0.7514), + make_row(0.70, 15, 98.5, 0.7514), + ] + + best_row = select_best_row(rows) + + assert best_row.min_score == 0.70 + assert best_row.max_hits == 15 + + +def test_parse_float_list_defaults_to_tenth_point_grid() -> None: + assert parse_float_list(None) == [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9] + + +def test_build_float_range_supports_hundredth_point_sweeps() -> None: + assert build_float_range(0.01, 0.05, 0.01) == [0.01, 0.02, 0.03, 0.04, 0.05] + + +def test_resolve_min_scores_uses_generated_range() -> None: + assert resolve_min_scores(None, 0.01, 0.03, 0.01) == [0.01, 0.02, 0.03] + + +def test_resolve_min_scores_rejects_mixed_inputs() -> None: + with pytest.raises(ValueError, match="Use either --min-scores"): + resolve_min_scores("0.1,0.2", 0.01, 0.03, 0.01) + + +def test_filter_min_scores_by_ceiling_skips_guaranteed_zero_rows() -> None: + effective_scores, skipped_scores = filter_min_scores_by_ceiling( + [0.01, 0.16, 0.17, 0.5], + 0.16, + ) + + assert effective_scores == [0.01, 0.16] + assert skipped_scores == [0.17, 0.5] + + +def test_load_message_texts_returns_one_text_blob_per_message() -> None: + repo_root = Path(__file__).resolve().parent.parent + + message_texts = load_message_texts(repo_root) + + assert message_texts + assert all(isinstance(text, str) for text in message_texts) diff --git a/tests/test_vectorbase.py b/tests/test_vectorbase.py index 81ccecc6..bb9ebb57 100644 --- a/tests/test_vectorbase.py +++ b/tests/test_vectorbase.py @@ -7,11 +7,42 @@ from typeagent.aitools.embeddings import ( CachingEmbeddingModel, NormalizedEmbedding, + NormalizedEmbeddings, ) from typeagent.aitools.model_adapters import ( create_test_embedding_model, ) -from typeagent.aitools.vectorbase import TextEmbeddingIndexSettings, VectorBase +from typeagent.aitools.vectorbase import ( + DEFAULT_MIN_SCORE, + TextEmbeddingIndexSettings, + VectorBase, +) + + +class FakeEmbeddingModel: + """Minimal embedding model stub for settings tests.""" + + def __init__(self, model_name: str) -> None: + self.model_name = model_name + + def add_embedding(self, key: str, embedding: NormalizedEmbedding) -> None: + del key, embedding + + async def get_embedding_nocache(self, input: str) -> NormalizedEmbedding: + del input + return np.array([1.0], dtype=np.float32) + + async def get_embeddings_nocache(self, input: list[str]) -> NormalizedEmbeddings: + del input + return np.array([[1.0]], dtype=np.float32) + + async def get_embedding(self, key: str) -> NormalizedEmbedding: + del key + return np.array([1.0], dtype=np.float32) + + async def get_embeddings(self, keys: list[str]) -> NormalizedEmbeddings: + del keys + return np.array([[1.0]], dtype=np.float32) @pytest.fixture(scope="function") @@ -38,7 +69,7 @@ def sample_embeddings() -> Samples: } -def test_add_embedding(vector_base: VectorBase, sample_embeddings: Samples): +def test_add_embedding(vector_base: VectorBase, sample_embeddings: Samples) -> None: """Test adding embeddings to the VectorBase.""" for key, embedding in sample_embeddings.items(): vector_base.add_embedding(key, embedding) @@ -48,7 +79,7 @@ def test_add_embedding(vector_base: VectorBase, sample_embeddings: Samples): np.testing.assert_array_equal(vector_base.serialize_embedding_at(i), embedding) -def test_add_embeddings(vector_base: VectorBase, sample_embeddings: Samples): +def test_add_embeddings(vector_base: VectorBase, sample_embeddings: Samples) -> None: """Adding multiple embeddings at once matches repeated single adds.""" keys = list(sample_embeddings.keys()) for key, embedding in sample_embeddings.items(): @@ -71,7 +102,7 @@ def test_add_embeddings(vector_base: VectorBase, sample_embeddings: Samples): @pytest.mark.asyncio -async def test_add_key(vector_base: VectorBase, sample_embeddings: Samples): +async def test_add_key(vector_base: VectorBase, sample_embeddings: Samples) -> None: """Test adding keys to the VectorBase.""" for key in sample_embeddings: await vector_base.add_key(key) @@ -80,7 +111,9 @@ async def test_add_key(vector_base: VectorBase, sample_embeddings: Samples): @pytest.mark.asyncio -async def test_add_key_no_cache(vector_base: VectorBase, sample_embeddings: Samples): +async def test_add_key_no_cache( + vector_base: VectorBase, sample_embeddings: Samples +) -> None: """Test adding keys to the VectorBase with cache disabled.""" for key in sample_embeddings: await vector_base.add_key(key, cache=False) @@ -91,7 +124,7 @@ async def test_add_key_no_cache(vector_base: VectorBase, sample_embeddings: Samp @pytest.mark.asyncio -async def test_add_keys(vector_base: VectorBase, sample_embeddings: Samples): +async def test_add_keys(vector_base: VectorBase, sample_embeddings: Samples) -> None: """Test adding multiple keys to the VectorBase.""" keys = list(sample_embeddings.keys()) await vector_base.add_keys(keys) @@ -100,7 +133,9 @@ async def test_add_keys(vector_base: VectorBase, sample_embeddings: Samples): @pytest.mark.asyncio -async def test_add_keys_no_cache(vector_base: VectorBase, sample_embeddings: Samples): +async def test_add_keys_no_cache( + vector_base: VectorBase, sample_embeddings: Samples +) -> None: """Test adding multiple keys to the VectorBase with cache disabled.""" keys = list(sample_embeddings.keys()) await vector_base.add_keys(keys, cache=False) @@ -111,7 +146,9 @@ async def test_add_keys_no_cache(vector_base: VectorBase, sample_embeddings: Sam @pytest.mark.asyncio -async def test_fuzzy_lookup(vector_base: VectorBase, sample_embeddings: Samples): +async def test_fuzzy_lookup( + vector_base: VectorBase, sample_embeddings: Samples +) -> None: """Test fuzzy lookup functionality.""" for key in sample_embeddings: await vector_base.add_key(key) @@ -122,7 +159,7 @@ async def test_fuzzy_lookup(vector_base: VectorBase, sample_embeddings: Samples) assert results[0].score > 0.9 # High similarity score for the same word -def test_clear(vector_base: VectorBase, sample_embeddings: Samples): +def test_clear(vector_base: VectorBase, sample_embeddings: Samples) -> None: """Test clearing the VectorBase.""" for key, embedding in sample_embeddings.items(): vector_base.add_embedding(key, embedding) @@ -132,7 +169,9 @@ def test_clear(vector_base: VectorBase, sample_embeddings: Samples): assert len(vector_base) == 0 -def test_serialize_deserialize(vector_base: VectorBase, sample_embeddings: Samples): +def test_serialize_deserialize( + vector_base: VectorBase, sample_embeddings: Samples +) -> None: """Test serialization and deserialization of the VectorBase.""" for key, embedding in sample_embeddings.items(): vector_base.add_embedding(key, embedding) @@ -149,12 +188,12 @@ def test_serialize_deserialize(vector_base: VectorBase, sample_embeddings: Sampl ) -def test_vectorbase_bool(vector_base: VectorBase): +def test_vectorbase_bool(vector_base: VectorBase) -> None: """__bool__ should always return True.""" assert bool(vector_base) is True -def test_get_embedding_at(vector_base: VectorBase, sample_embeddings: Samples): +def test_get_embedding_at(vector_base: VectorBase, sample_embeddings: Samples) -> None: """Test get_embedding_at returns correct embedding and raises IndexError.""" for key, embedding in sample_embeddings.items(): vector_base.add_embedding(key, embedding) @@ -169,7 +208,7 @@ def test_get_embedding_at(vector_base: VectorBase, sample_embeddings: Samples): def test_fuzzy_lookup_embedding_in_subset( vector_base: VectorBase, sample_embeddings: Samples -): +) -> None: """Test fuzzy_lookup_embedding_in_subset returns best match in subset or None.""" keys = list(sample_embeddings.keys()) for key, embedding in sample_embeddings.items(): @@ -220,3 +259,51 @@ def test_add_embeddings_wrong_ndim(vector_base: VectorBase) -> None: emb1d = np.array([0.1, 0.2, 0.3], dtype=np.float32) with pytest.raises(ValueError, match="Expected 2D"): vector_base.add_embeddings(None, emb1d) + + +@pytest.mark.parametrize( + ("model_name", "expected_min_score"), + [ + ("text-embedding-3-large", 0.07), + ("text-embedding-3-small", 0.16), + ("text-embedding-ada-002", 0.72), + ], +) +def test_text_embedding_index_settings_uses_known_model_default( + model_name: str, expected_min_score: float +) -> None: + settings = TextEmbeddingIndexSettings( + embedding_model=FakeEmbeddingModel(model_name) + ) + + assert settings.min_score == expected_min_score + assert settings.max_matches is None + + +def test_text_embedding_index_settings_keeps_unknown_model_fallback() -> None: + settings = TextEmbeddingIndexSettings( + embedding_model=FakeEmbeddingModel("custom-embedding-model") + ) + + assert settings.min_score == DEFAULT_MIN_SCORE + assert settings.max_matches is None + + +def test_text_embedding_index_settings_explicit_overrides_win() -> None: + settings = TextEmbeddingIndexSettings( + embedding_model=FakeEmbeddingModel("text-embedding-3-large"), + min_score=0.55, + max_matches=7, + ) + + assert settings.min_score == 0.55 + assert settings.max_matches == 7 + + +def test_text_embedding_index_settings_invalid_max_matches_becomes_none() -> None: + settings = TextEmbeddingIndexSettings( + embedding_model=FakeEmbeddingModel("text-embedding-3-large"), + max_matches=0, + ) + + assert settings.max_matches is None diff --git a/tools/benchmark_embeddings.py b/tools/benchmark_embeddings.py index 4358ea31..81938961 100644 --- a/tools/benchmark_embeddings.py +++ b/tools/benchmark_embeddings.py @@ -16,22 +16,30 @@ Usage: uv run python tools/benchmark_embeddings.py uv run python tools/benchmark_embeddings.py --model openai:text-embedding-3-small + uv run python tools/benchmark_embeddings.py --model openai:text-embedding-3-small --min-score-start 0.01 --min-score-stop 0.20 --min-score-step 0.01 """ import argparse import asyncio from dataclasses import dataclass +from decimal import Decimal import json from pathlib import Path from statistics import mean from dotenv import load_dotenv -from typeagent.aitools.embeddings import IEmbeddingModel, NormalizedEmbeddings +from typeagent.aitools.embeddings import ( + IEmbeddingModel, + NormalizedEmbeddings, +) from typeagent.aitools.model_adapters import create_embedding_model -from typeagent.aitools.vectorbase import TextEmbeddingIndexSettings, VectorBase +from typeagent.aitools.vectorbase import ( + TextEmbeddingIndexSettings, + VectorBase, +) -DEFAULT_MIN_SCORES = [0.25, 0.30, 0.35, 0.40, 0.50, 0.60, 0.70, 0.75, 0.80, 0.85] +DEFAULT_MIN_SCORES = [score / 10 for score in range(1, 10)] DEFAULT_MAX_HITS = [5, 10, 15, 20] DATA_DIR = Path("tests") / "testdata" INDEX_DATA_PATH = DATA_DIR / "Episode_53_AdrianTchaikovsky_index_data.json" @@ -40,24 +48,41 @@ @dataclass class SearchQueryCase: + """A benchmark query paired with the message ordinals it should retrieve.""" + query: str expected_matches: list[int] @dataclass class SearchMetrics: + """Aggregate retrieval quality metrics for one benchmark row.""" + hit_rate: float mean_reciprocal_rank: float +@dataclass +class TopScoreStats: + """Observed top-1 score statistics across all benchmark queries.""" + + min_top_score: float + mean_top_score: float + max_top_score: float + + @dataclass class BenchmarkRow: + """One `(min_score, max_hits)` configuration evaluated by the benchmark.""" + min_score: float max_hits: int metrics: SearchMetrics def parse_float_list(raw: str | None) -> list[float]: + """Parse explicit min-score values or fall back to the coarse default grid.""" + if raw is None: return DEFAULT_MIN_SCORES values = [float(item.strip()) for item in raw.split(",") if item.strip()] @@ -66,7 +91,54 @@ def parse_float_list(raw: str | None) -> list[float]: return values +def build_float_range(start: float, stop: float, step: float) -> list[float]: + """Build an inclusive decimal-safe float range for score sweeps.""" + + if step <= 0: + raise ValueError("--min-score-step must be positive") + if start > stop: + raise ValueError("--min-score-start must be <= --min-score-stop") + + start_decimal = Decimal(str(start)) + stop_decimal = Decimal(str(stop)) + step_decimal = Decimal(str(step)) + values: list[float] = [] + current = start_decimal + while current <= stop_decimal: + values.append(float(current)) + current += step_decimal + return values + + +def resolve_min_scores( + raw: str | None, + start: float | None, + stop: float | None, + step: float | None, +) -> list[float]: + """Resolve the benchmark min-score grid from explicit values or a generated range.""" + + range_args = [start, stop, step] + using_range = any(value is not None for value in range_args) + if using_range: + if raw is not None: + raise ValueError( + "Use either --min-scores or the --min-score-start/stop/step range" + ) + if any(value is None for value in range_args): + raise ValueError( + "--min-score-start, --min-score-stop, and --min-score-step must all be set together" + ) + assert start is not None + assert stop is not None + assert step is not None + return build_float_range(start, stop, step) + return parse_float_list(raw) + + def parse_int_list(raw: str | None) -> list[int]: + """Parse positive integer arguments such as `max_hits` grids.""" + if raw is None: return DEFAULT_MAX_HITS values = [int(item.strip()) for item in raw.split(",") if item.strip()] @@ -78,12 +150,16 @@ def parse_int_list(raw: str | None) -> list[int]: def load_message_texts(repo_root: Path) -> list[str]: + """Load the benchmark corpus as one text blob per message.""" + index_data = json.loads((repo_root / INDEX_DATA_PATH).read_text(encoding="utf-8")) messages = index_data["messages"] return [" ".join(message.get("textChunks", [])) for message in messages] def load_search_queries(repo_root: Path) -> list[SearchQueryCase]: + """Load benchmark queries that include message-level ground-truth matches.""" + search_data = json.loads( (repo_root / SEARCH_RESULTS_PATH).read_text(encoding="utf-8") ) @@ -105,6 +181,8 @@ async def build_vector_base( message_texts: list[str], batch_size: int, ) -> tuple[IEmbeddingModel, VectorBase]: + """Build a message-level vector index for the benchmark corpus.""" + model = create_embedding_model(model_spec) settings = TextEmbeddingIndexSettings( embedding_model=model, @@ -113,7 +191,6 @@ async def build_vector_base( batch_size=batch_size, ) vector_base = VectorBase(settings) - for start in range(0, len(message_texts), batch_size): batch = message_texts[start : start + batch_size] await vector_base.add_keys(batch) @@ -128,6 +205,8 @@ def evaluate_search_queries( min_score: float, max_hits: int, ) -> SearchMetrics: + """Evaluate one benchmark row over every labeled query.""" + hit_count = 0 reciprocal_ranks: list[float] = [] @@ -154,19 +233,59 @@ def evaluate_search_queries( ) +def measure_top_score_stats( + vector_base: VectorBase, + query_embeddings: NormalizedEmbeddings, +) -> TopScoreStats: + """Measure the achievable top-1 score range for the current model and corpus.""" + + top_scores: list[float] = [] + for query_embedding in query_embeddings: + scored_results = vector_base.fuzzy_lookup_embedding( + query_embedding, + max_hits=1, + min_score=0.0, + ) + top_scores.append(scored_results[0].score if scored_results else 0.0) + + return TopScoreStats( + min_top_score=min(top_scores), + mean_top_score=mean(top_scores), + max_top_score=max(top_scores), + ) + + +def filter_min_scores_by_ceiling( + min_scores: list[float], max_top_score: float +) -> tuple[list[float], list[float]]: + """Discard score thresholds that cannot return any results for this run.""" + + effective_scores = [ + min_score for min_score in min_scores if min_score <= max_top_score + 1e-9 + ] + skipped_scores = [ + min_score for min_score in min_scores if min_score > max_top_score + 1e-9 + ] + return effective_scores, skipped_scores + + def select_best_row(rows: list[BenchmarkRow]) -> BenchmarkRow: + """Prefer the strongest MRR/Hit Rate row, then the stricter score cutoff.""" + return max( rows, key=lambda row: ( row.metrics.mean_reciprocal_rank, row.metrics.hit_rate, - -row.min_score, + row.min_score, -row.max_hits, ), ) def print_rows(rows: list[BenchmarkRow]) -> None: + """Print the benchmark grid in a reviewer-friendly table.""" + print("=" * 72) print("SEARCH BENCHMARK (Episode 53 messageMatches ground truth)") print("=" * 72) @@ -187,6 +306,8 @@ async def run_benchmark( max_hits_values: list[int], batch_size: int, ) -> None: + """Run a single benchmark sweep and print the evaluated grid.""" + load_dotenv() repo_root = Path(__file__).resolve().parent.parent @@ -194,11 +315,25 @@ async def run_benchmark( query_cases = load_search_queries(repo_root) if not query_cases: raise ValueError("No search queries with messageMatches found in the dataset") - model, vector_base = await build_vector_base(model_spec, message_texts, batch_size) + model, vector_base = await build_vector_base( + model_spec, + message_texts, + batch_size, + ) query_embeddings = await model.get_embeddings([case.query for case in query_cases]) + top_score_stats = measure_top_score_stats(vector_base, query_embeddings) + effective_min_scores, skipped_min_scores = filter_min_scores_by_ceiling( + min_scores, + top_score_stats.max_top_score, + ) + if not effective_min_scores: + raise ValueError( + "No requested min_score values are below the observed top-score ceiling " + f"of {top_score_stats.max_top_score:.4f}" + ) rows: list[BenchmarkRow] = [] - for min_score in min_scores: + for min_score in effective_min_scores: for max_hits in max_hits_values: metrics = evaluate_search_queries( vector_base, @@ -212,6 +347,16 @@ async def run_benchmark( print(f"Model: {model.model_name}") print(f"Messages indexed: {len(message_texts)}") print(f"Queries evaluated: {len(query_cases)}") + print( + "Observed top-1 score range: " + f"{top_score_stats.min_top_score:.4f}..{top_score_stats.max_top_score:.4f} " + f"(mean {top_score_stats.mean_top_score:.4f})" + ) + if skipped_min_scores: + print( + f"Skipped {len(skipped_min_scores)} min_score values above " + f"{top_score_stats.max_top_score:.4f}; they cannot return any matches." + ) print() print_rows(rows) @@ -225,6 +370,8 @@ async def run_benchmark( def main() -> None: + """Parse CLI arguments and run the benchmark once.""" + parser = argparse.ArgumentParser( description="Benchmark retrieval settings for an embedding model." ) @@ -240,6 +387,24 @@ def main() -> None: default=None, help="Comma-separated min_score values to test.", ) + parser.add_argument( + "--min-score-start", + type=float, + default=None, + help="Inclusive start of a generated min_score range.", + ) + parser.add_argument( + "--min-score-stop", + type=float, + default=None, + help="Inclusive end of a generated min_score range.", + ) + parser.add_argument( + "--min-score-step", + type=float, + default=None, + help="Step size for a generated min_score range.", + ) parser.add_argument( "--max-hits", type=str, @@ -253,11 +418,18 @@ def main() -> None: help="Batch size used when building the index.", ) args = parser.parse_args() + if args.batch_size <= 0: + raise ValueError("--batch-size must be a positive integer") asyncio.run( run_benchmark( model_spec=args.model, - min_scores=parse_float_list(args.min_scores), + min_scores=resolve_min_scores( + args.min_scores, + args.min_score_start, + args.min_score_stop, + args.min_score_step, + ), max_hits_values=parse_int_list(args.max_hits), batch_size=args.batch_size, )