diff --git a/src/typeagent/aitools/vectorbase.py b/src/typeagent/aitools/vectorbase.py index e22083c..8f898eb 100644 --- a/src/typeagent/aitools/vectorbase.py +++ b/src/typeagent/aitools/vectorbase.py @@ -13,15 +13,46 @@ ) from .model_adapters import create_embedding_model +DEFAULT_MIN_SCORE = 0.85 + +# Empirical defaults for built-in OpenAI embedding models. +# These values come from repeated runs of the Adrian Tchaikovsky Episode 53 +# search benchmark in `tools/repeat_embedding_benchmarks.py`, using an +# exhaustive 0.01..1.00 min_score sweep on the Adrian Tchaikovsky Episode 53 +# dataset. We keep the highest min_score that preserves the best benchmark +# metrics for each model, which yielded the current plateau boundaries of 0.16 for +# `text-embedding-3-small`, 0.07 for `text-embedding-3-large`, and 0.72 for +# `text-embedding-ada-002`. These are repository defaults for known models, +# not universal truths. Unknown models keep the long-standing fallback score +# of 0.85. Callers can always override `min_score` explicitly for their own +# use cases or models. We intentionally leave `max_matches` out of this table: +# the benchmark still reports a best `max_hits` row, but the library default +# remains `None` unless a caller opts into a specific limit. +MODEL_DEFAULT_MIN_SCORES: dict[str, float] = { + "text-embedding-3-large": 0.07, + "text-embedding-3-small": 0.16, + "text-embedding-ada-002": 0.72, +} + + +def get_default_min_score(model_name: str) -> float: + """Return the repository default score cutoff for a known model name.""" + + return MODEL_DEFAULT_MIN_SCORES.get(model_name, DEFAULT_MIN_SCORE) + @dataclass class ScoredInt: + """Associate an integer ordinal with its similarity score.""" + item: int score: float @dataclass class TextEmbeddingIndexSettings: + """Runtime settings for embedding-backed fuzzy lookup.""" + embedding_model: IEmbeddingModel min_score: float # Between 0.0 and 1.0 max_matches: int | None # >= 1; None means no limit @@ -34,10 +65,12 @@ def __init__( max_matches: int | None = None, batch_size: int | None = None, ): - self.min_score = min_score if min_score is not None else 0.85 + self.embedding_model = embedding_model or create_embedding_model() + model_name = getattr(self.embedding_model, "model_name", "") + default_min_score = get_default_min_score(model_name) + self.min_score = min_score if min_score is not None else default_min_score self.max_matches = max_matches if max_matches and max_matches >= 1 else None self.batch_size = batch_size if batch_size and batch_size >= 1 else 8 - self.embedding_model = embedding_model or create_embedding_model() class VectorBase: @@ -166,27 +199,10 @@ def fuzzy_lookup_embedding_in_subset( max_hits: int | None = None, min_score: float | None = None, ) -> list[ScoredInt]: - if max_hits is None: - max_hits = 10 - if min_score is None: - min_score = 0.0 - if not ordinals_of_subset or len(self._vectors) == 0: - return [] - # Compute dot products only for the subset instead of all vectors. - subset = np.asarray(ordinals_of_subset) - scores = np.dot(self._vectors[subset], embedding) - indices = np.flatnonzero(scores >= min_score) - if len(indices) == 0: - return [] - filtered_scores = scores[indices] - if len(indices) <= max_hits: - order = np.argsort(filtered_scores)[::-1] - else: - top_k = np.argpartition(filtered_scores, -max_hits)[-max_hits:] - order = top_k[np.argsort(filtered_scores[top_k])[::-1]] - return [ - ScoredInt(int(subset[indices[i]]), float(filtered_scores[i])) for i in order - ] + ordinals_set = set(ordinals_of_subset) + return self.fuzzy_lookup_embedding( + embedding, max_hits, min_score, lambda i: i in ordinals_set + ) async def fuzzy_lookup( self, @@ -235,7 +251,7 @@ def deserialize(self, data: NormalizedEmbeddings | None) -> None: return if self._embedding_size == 0: if data.ndim < 2 or data.shape[0] == 0: - # Empty data — can't determine size; just clear. + # Empty data can't determine size; just clear. self.clear() return self._set_embedding_size(data.shape[1]) diff --git a/tests/test_benchmark_embeddings.py b/tests/test_benchmark_embeddings.py new file mode 100644 index 0000000..6e822c2 --- /dev/null +++ b/tests/test_benchmark_embeddings.py @@ -0,0 +1,103 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from importlib.util import module_from_spec, spec_from_file_location +from pathlib import Path + +import pytest + +MODULE_PATH = ( + Path(__file__).resolve().parent.parent / "tools" / "benchmark_embeddings.py" +) +SPEC = spec_from_file_location("benchmark_embeddings_for_test", MODULE_PATH) +assert SPEC is not None +assert SPEC.loader is not None +BENCHMARK_EMBEDDINGS = module_from_spec(SPEC) +SPEC.loader.exec_module(BENCHMARK_EMBEDDINGS) + +BenchmarkRow = BENCHMARK_EMBEDDINGS.BenchmarkRow +SearchMetrics = BENCHMARK_EMBEDDINGS.SearchMetrics +build_float_range = BENCHMARK_EMBEDDINGS.build_float_range +filter_min_scores_by_ceiling = BENCHMARK_EMBEDDINGS.filter_min_scores_by_ceiling +load_message_texts = BENCHMARK_EMBEDDINGS.load_message_texts +parse_float_list = BENCHMARK_EMBEDDINGS.parse_float_list +resolve_min_scores = BENCHMARK_EMBEDDINGS.resolve_min_scores +select_best_row = BENCHMARK_EMBEDDINGS.select_best_row + + +def make_row( + min_score: float, + max_hits: int, + hit_rate: float, + mean_reciprocal_rank: float, +) -> BenchmarkRow: + """Build a benchmark row without repeating nested metrics boilerplate.""" + + return BenchmarkRow( + min_score=min_score, + max_hits=max_hits, + metrics=SearchMetrics( + hit_rate=hit_rate, + mean_reciprocal_rank=mean_reciprocal_rank, + ), + ) + + +def test_select_best_row_prefers_higher_min_score_on_metric_tie() -> None: + rows = [ + make_row(0.25, 15, 98.5, 0.7514), + make_row(0.70, 15, 98.5, 0.7514), + ] + + best_row = select_best_row(rows) + + assert best_row.min_score == 0.70 + assert best_row.max_hits == 15 + + +def test_select_best_row_prefers_lower_max_hits_on_full_tie() -> None: + rows = [ + make_row(0.70, 20, 98.5, 0.7514), + make_row(0.70, 15, 98.5, 0.7514), + ] + + best_row = select_best_row(rows) + + assert best_row.min_score == 0.70 + assert best_row.max_hits == 15 + + +def test_parse_float_list_defaults_to_tenth_point_grid() -> None: + assert parse_float_list(None) == [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9] + + +def test_build_float_range_supports_hundredth_point_sweeps() -> None: + assert build_float_range(0.01, 0.05, 0.01) == [0.01, 0.02, 0.03, 0.04, 0.05] + + +def test_resolve_min_scores_uses_generated_range() -> None: + assert resolve_min_scores(None, 0.01, 0.03, 0.01) == [0.01, 0.02, 0.03] + + +def test_resolve_min_scores_rejects_mixed_inputs() -> None: + with pytest.raises(ValueError, match="Use either --min-scores"): + resolve_min_scores("0.1,0.2", 0.01, 0.03, 0.01) + + +def test_filter_min_scores_by_ceiling_skips_guaranteed_zero_rows() -> None: + effective_scores, skipped_scores = filter_min_scores_by_ceiling( + [0.01, 0.16, 0.17, 0.5], + 0.16, + ) + + assert effective_scores == [0.01, 0.16] + assert skipped_scores == [0.17, 0.5] + + +def test_load_message_texts_returns_one_text_blob_per_message() -> None: + repo_root = Path(__file__).resolve().parent.parent + + message_texts = load_message_texts(repo_root) + + assert message_texts + assert all(isinstance(text, str) for text in message_texts) diff --git a/tests/test_vectorbase.py b/tests/test_vectorbase.py index 81ccecc..bb9ebb5 100644 --- a/tests/test_vectorbase.py +++ b/tests/test_vectorbase.py @@ -7,11 +7,42 @@ from typeagent.aitools.embeddings import ( CachingEmbeddingModel, NormalizedEmbedding, + NormalizedEmbeddings, ) from typeagent.aitools.model_adapters import ( create_test_embedding_model, ) -from typeagent.aitools.vectorbase import TextEmbeddingIndexSettings, VectorBase +from typeagent.aitools.vectorbase import ( + DEFAULT_MIN_SCORE, + TextEmbeddingIndexSettings, + VectorBase, +) + + +class FakeEmbeddingModel: + """Minimal embedding model stub for settings tests.""" + + def __init__(self, model_name: str) -> None: + self.model_name = model_name + + def add_embedding(self, key: str, embedding: NormalizedEmbedding) -> None: + del key, embedding + + async def get_embedding_nocache(self, input: str) -> NormalizedEmbedding: + del input + return np.array([1.0], dtype=np.float32) + + async def get_embeddings_nocache(self, input: list[str]) -> NormalizedEmbeddings: + del input + return np.array([[1.0]], dtype=np.float32) + + async def get_embedding(self, key: str) -> NormalizedEmbedding: + del key + return np.array([1.0], dtype=np.float32) + + async def get_embeddings(self, keys: list[str]) -> NormalizedEmbeddings: + del keys + return np.array([[1.0]], dtype=np.float32) @pytest.fixture(scope="function") @@ -38,7 +69,7 @@ def sample_embeddings() -> Samples: } -def test_add_embedding(vector_base: VectorBase, sample_embeddings: Samples): +def test_add_embedding(vector_base: VectorBase, sample_embeddings: Samples) -> None: """Test adding embeddings to the VectorBase.""" for key, embedding in sample_embeddings.items(): vector_base.add_embedding(key, embedding) @@ -48,7 +79,7 @@ def test_add_embedding(vector_base: VectorBase, sample_embeddings: Samples): np.testing.assert_array_equal(vector_base.serialize_embedding_at(i), embedding) -def test_add_embeddings(vector_base: VectorBase, sample_embeddings: Samples): +def test_add_embeddings(vector_base: VectorBase, sample_embeddings: Samples) -> None: """Adding multiple embeddings at once matches repeated single adds.""" keys = list(sample_embeddings.keys()) for key, embedding in sample_embeddings.items(): @@ -71,7 +102,7 @@ def test_add_embeddings(vector_base: VectorBase, sample_embeddings: Samples): @pytest.mark.asyncio -async def test_add_key(vector_base: VectorBase, sample_embeddings: Samples): +async def test_add_key(vector_base: VectorBase, sample_embeddings: Samples) -> None: """Test adding keys to the VectorBase.""" for key in sample_embeddings: await vector_base.add_key(key) @@ -80,7 +111,9 @@ async def test_add_key(vector_base: VectorBase, sample_embeddings: Samples): @pytest.mark.asyncio -async def test_add_key_no_cache(vector_base: VectorBase, sample_embeddings: Samples): +async def test_add_key_no_cache( + vector_base: VectorBase, sample_embeddings: Samples +) -> None: """Test adding keys to the VectorBase with cache disabled.""" for key in sample_embeddings: await vector_base.add_key(key, cache=False) @@ -91,7 +124,7 @@ async def test_add_key_no_cache(vector_base: VectorBase, sample_embeddings: Samp @pytest.mark.asyncio -async def test_add_keys(vector_base: VectorBase, sample_embeddings: Samples): +async def test_add_keys(vector_base: VectorBase, sample_embeddings: Samples) -> None: """Test adding multiple keys to the VectorBase.""" keys = list(sample_embeddings.keys()) await vector_base.add_keys(keys) @@ -100,7 +133,9 @@ async def test_add_keys(vector_base: VectorBase, sample_embeddings: Samples): @pytest.mark.asyncio -async def test_add_keys_no_cache(vector_base: VectorBase, sample_embeddings: Samples): +async def test_add_keys_no_cache( + vector_base: VectorBase, sample_embeddings: Samples +) -> None: """Test adding multiple keys to the VectorBase with cache disabled.""" keys = list(sample_embeddings.keys()) await vector_base.add_keys(keys, cache=False) @@ -111,7 +146,9 @@ async def test_add_keys_no_cache(vector_base: VectorBase, sample_embeddings: Sam @pytest.mark.asyncio -async def test_fuzzy_lookup(vector_base: VectorBase, sample_embeddings: Samples): +async def test_fuzzy_lookup( + vector_base: VectorBase, sample_embeddings: Samples +) -> None: """Test fuzzy lookup functionality.""" for key in sample_embeddings: await vector_base.add_key(key) @@ -122,7 +159,7 @@ async def test_fuzzy_lookup(vector_base: VectorBase, sample_embeddings: Samples) assert results[0].score > 0.9 # High similarity score for the same word -def test_clear(vector_base: VectorBase, sample_embeddings: Samples): +def test_clear(vector_base: VectorBase, sample_embeddings: Samples) -> None: """Test clearing the VectorBase.""" for key, embedding in sample_embeddings.items(): vector_base.add_embedding(key, embedding) @@ -132,7 +169,9 @@ def test_clear(vector_base: VectorBase, sample_embeddings: Samples): assert len(vector_base) == 0 -def test_serialize_deserialize(vector_base: VectorBase, sample_embeddings: Samples): +def test_serialize_deserialize( + vector_base: VectorBase, sample_embeddings: Samples +) -> None: """Test serialization and deserialization of the VectorBase.""" for key, embedding in sample_embeddings.items(): vector_base.add_embedding(key, embedding) @@ -149,12 +188,12 @@ def test_serialize_deserialize(vector_base: VectorBase, sample_embeddings: Sampl ) -def test_vectorbase_bool(vector_base: VectorBase): +def test_vectorbase_bool(vector_base: VectorBase) -> None: """__bool__ should always return True.""" assert bool(vector_base) is True -def test_get_embedding_at(vector_base: VectorBase, sample_embeddings: Samples): +def test_get_embedding_at(vector_base: VectorBase, sample_embeddings: Samples) -> None: """Test get_embedding_at returns correct embedding and raises IndexError.""" for key, embedding in sample_embeddings.items(): vector_base.add_embedding(key, embedding) @@ -169,7 +208,7 @@ def test_get_embedding_at(vector_base: VectorBase, sample_embeddings: Samples): def test_fuzzy_lookup_embedding_in_subset( vector_base: VectorBase, sample_embeddings: Samples -): +) -> None: """Test fuzzy_lookup_embedding_in_subset returns best match in subset or None.""" keys = list(sample_embeddings.keys()) for key, embedding in sample_embeddings.items(): @@ -220,3 +259,51 @@ def test_add_embeddings_wrong_ndim(vector_base: VectorBase) -> None: emb1d = np.array([0.1, 0.2, 0.3], dtype=np.float32) with pytest.raises(ValueError, match="Expected 2D"): vector_base.add_embeddings(None, emb1d) + + +@pytest.mark.parametrize( + ("model_name", "expected_min_score"), + [ + ("text-embedding-3-large", 0.07), + ("text-embedding-3-small", 0.16), + ("text-embedding-ada-002", 0.72), + ], +) +def test_text_embedding_index_settings_uses_known_model_default( + model_name: str, expected_min_score: float +) -> None: + settings = TextEmbeddingIndexSettings( + embedding_model=FakeEmbeddingModel(model_name) + ) + + assert settings.min_score == expected_min_score + assert settings.max_matches is None + + +def test_text_embedding_index_settings_keeps_unknown_model_fallback() -> None: + settings = TextEmbeddingIndexSettings( + embedding_model=FakeEmbeddingModel("custom-embedding-model") + ) + + assert settings.min_score == DEFAULT_MIN_SCORE + assert settings.max_matches is None + + +def test_text_embedding_index_settings_explicit_overrides_win() -> None: + settings = TextEmbeddingIndexSettings( + embedding_model=FakeEmbeddingModel("text-embedding-3-large"), + min_score=0.55, + max_matches=7, + ) + + assert settings.min_score == 0.55 + assert settings.max_matches == 7 + + +def test_text_embedding_index_settings_invalid_max_matches_becomes_none() -> None: + settings = TextEmbeddingIndexSettings( + embedding_model=FakeEmbeddingModel("text-embedding-3-large"), + max_matches=0, + ) + + assert settings.max_matches is None diff --git a/tools/benchmark_embeddings.py b/tools/benchmark_embeddings.py new file mode 100644 index 0000000..8193896 --- /dev/null +++ b/tools/benchmark_embeddings.py @@ -0,0 +1,440 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Benchmark retrieval settings for known embedding models. + +This script evaluates the Adrian Tchaikovsky Episode 53 search dataset in +`tests/testdata/` and reports retrieval quality for combinations of +`min_score` and `max_hits`. + +The benchmark is intentionally narrow: +- It only measures retrieval against `messageMatches` ground truth. +- It is meant to help choose repository defaults for known models. +- In practice, `min_score` is the primary library default this informs. +- It does not prove universal "best" settings for every dataset. + +Usage: + uv run python tools/benchmark_embeddings.py + uv run python tools/benchmark_embeddings.py --model openai:text-embedding-3-small + uv run python tools/benchmark_embeddings.py --model openai:text-embedding-3-small --min-score-start 0.01 --min-score-stop 0.20 --min-score-step 0.01 +""" + +import argparse +import asyncio +from dataclasses import dataclass +from decimal import Decimal +import json +from pathlib import Path +from statistics import mean + +from dotenv import load_dotenv + +from typeagent.aitools.embeddings import ( + IEmbeddingModel, + NormalizedEmbeddings, +) +from typeagent.aitools.model_adapters import create_embedding_model +from typeagent.aitools.vectorbase import ( + TextEmbeddingIndexSettings, + VectorBase, +) + +DEFAULT_MIN_SCORES = [score / 10 for score in range(1, 10)] +DEFAULT_MAX_HITS = [5, 10, 15, 20] +DATA_DIR = Path("tests") / "testdata" +INDEX_DATA_PATH = DATA_DIR / "Episode_53_AdrianTchaikovsky_index_data.json" +SEARCH_RESULTS_PATH = DATA_DIR / "Episode_53_Search_results.json" + + +@dataclass +class SearchQueryCase: + """A benchmark query paired with the message ordinals it should retrieve.""" + + query: str + expected_matches: list[int] + + +@dataclass +class SearchMetrics: + """Aggregate retrieval quality metrics for one benchmark row.""" + + hit_rate: float + mean_reciprocal_rank: float + + +@dataclass +class TopScoreStats: + """Observed top-1 score statistics across all benchmark queries.""" + + min_top_score: float + mean_top_score: float + max_top_score: float + + +@dataclass +class BenchmarkRow: + """One `(min_score, max_hits)` configuration evaluated by the benchmark.""" + + min_score: float + max_hits: int + metrics: SearchMetrics + + +def parse_float_list(raw: str | None) -> list[float]: + """Parse explicit min-score values or fall back to the coarse default grid.""" + + if raw is None: + return DEFAULT_MIN_SCORES + values = [float(item.strip()) for item in raw.split(",") if item.strip()] + if not values: + raise ValueError("--min-scores must contain at least one value") + return values + + +def build_float_range(start: float, stop: float, step: float) -> list[float]: + """Build an inclusive decimal-safe float range for score sweeps.""" + + if step <= 0: + raise ValueError("--min-score-step must be positive") + if start > stop: + raise ValueError("--min-score-start must be <= --min-score-stop") + + start_decimal = Decimal(str(start)) + stop_decimal = Decimal(str(stop)) + step_decimal = Decimal(str(step)) + values: list[float] = [] + current = start_decimal + while current <= stop_decimal: + values.append(float(current)) + current += step_decimal + return values + + +def resolve_min_scores( + raw: str | None, + start: float | None, + stop: float | None, + step: float | None, +) -> list[float]: + """Resolve the benchmark min-score grid from explicit values or a generated range.""" + + range_args = [start, stop, step] + using_range = any(value is not None for value in range_args) + if using_range: + if raw is not None: + raise ValueError( + "Use either --min-scores or the --min-score-start/stop/step range" + ) + if any(value is None for value in range_args): + raise ValueError( + "--min-score-start, --min-score-stop, and --min-score-step must all be set together" + ) + assert start is not None + assert stop is not None + assert step is not None + return build_float_range(start, stop, step) + return parse_float_list(raw) + + +def parse_int_list(raw: str | None) -> list[int]: + """Parse positive integer arguments such as `max_hits` grids.""" + + if raw is None: + return DEFAULT_MAX_HITS + values = [int(item.strip()) for item in raw.split(",") if item.strip()] + if not values: + raise ValueError("--max-hits must contain at least one value") + if any(value <= 0 for value in values): + raise ValueError("--max-hits values must be positive integers") + return values + + +def load_message_texts(repo_root: Path) -> list[str]: + """Load the benchmark corpus as one text blob per message.""" + + index_data = json.loads((repo_root / INDEX_DATA_PATH).read_text(encoding="utf-8")) + messages = index_data["messages"] + return [" ".join(message.get("textChunks", [])) for message in messages] + + +def load_search_queries(repo_root: Path) -> list[SearchQueryCase]: + """Load benchmark queries that include message-level ground-truth matches.""" + + search_data = json.loads( + (repo_root / SEARCH_RESULTS_PATH).read_text(encoding="utf-8") + ) + cases: list[SearchQueryCase] = [] + for item in search_data: + search_text = item.get("searchText") + results = item.get("results", []) + if not search_text or not results: + continue + expected_matches = results[0].get("messageMatches", []) + if not expected_matches: + continue + cases.append(SearchQueryCase(search_text, expected_matches)) + return cases + + +async def build_vector_base( + model_spec: str | None, + message_texts: list[str], + batch_size: int, +) -> tuple[IEmbeddingModel, VectorBase]: + """Build a message-level vector index for the benchmark corpus.""" + + model = create_embedding_model(model_spec) + settings = TextEmbeddingIndexSettings( + embedding_model=model, + min_score=0.0, + max_matches=None, + batch_size=batch_size, + ) + vector_base = VectorBase(settings) + for start in range(0, len(message_texts), batch_size): + batch = message_texts[start : start + batch_size] + await vector_base.add_keys(batch) + + return model, vector_base + + +def evaluate_search_queries( + vector_base: VectorBase, + query_cases: list[SearchQueryCase], + query_embeddings: NormalizedEmbeddings, + min_score: float, + max_hits: int, +) -> SearchMetrics: + """Evaluate one benchmark row over every labeled query.""" + + hit_count = 0 + reciprocal_ranks: list[float] = [] + + for case, query_embedding in zip(query_cases, query_embeddings): + scored_results = vector_base.fuzzy_lookup_embedding( + query_embedding, + max_hits=max_hits, + min_score=min_score, + ) + rank = 0 + for result_index, scored_result in enumerate(scored_results, start=1): + if scored_result.item in case.expected_matches: + rank = result_index + break + if rank > 0: + hit_count += 1 + reciprocal_ranks.append(1.0 / rank) + else: + reciprocal_ranks.append(0.0) + + return SearchMetrics( + hit_rate=(hit_count / len(query_cases)) * 100, + mean_reciprocal_rank=mean(reciprocal_ranks), + ) + + +def measure_top_score_stats( + vector_base: VectorBase, + query_embeddings: NormalizedEmbeddings, +) -> TopScoreStats: + """Measure the achievable top-1 score range for the current model and corpus.""" + + top_scores: list[float] = [] + for query_embedding in query_embeddings: + scored_results = vector_base.fuzzy_lookup_embedding( + query_embedding, + max_hits=1, + min_score=0.0, + ) + top_scores.append(scored_results[0].score if scored_results else 0.0) + + return TopScoreStats( + min_top_score=min(top_scores), + mean_top_score=mean(top_scores), + max_top_score=max(top_scores), + ) + + +def filter_min_scores_by_ceiling( + min_scores: list[float], max_top_score: float +) -> tuple[list[float], list[float]]: + """Discard score thresholds that cannot return any results for this run.""" + + effective_scores = [ + min_score for min_score in min_scores if min_score <= max_top_score + 1e-9 + ] + skipped_scores = [ + min_score for min_score in min_scores if min_score > max_top_score + 1e-9 + ] + return effective_scores, skipped_scores + + +def select_best_row(rows: list[BenchmarkRow]) -> BenchmarkRow: + """Prefer the strongest MRR/Hit Rate row, then the stricter score cutoff.""" + + return max( + rows, + key=lambda row: ( + row.metrics.mean_reciprocal_rank, + row.metrics.hit_rate, + row.min_score, + -row.max_hits, + ), + ) + + +def print_rows(rows: list[BenchmarkRow]) -> None: + """Print the benchmark grid in a reviewer-friendly table.""" + + print("=" * 72) + print("SEARCH BENCHMARK (Episode 53 messageMatches ground truth)") + print("=" * 72) + print(f"{'Min Score':<12} | {'Max Hits':<10} | {'Hit Rate (%)':<15} | {'MRR':<10}") + print("-" * 65) + for row in rows: + print( + f"{row.min_score:<12.2f} | {row.max_hits:<10d} | " + f"{row.metrics.hit_rate:<15.2f} | " + f"{row.metrics.mean_reciprocal_rank:<10.4f}" + ) + print("-" * 65) + + +async def run_benchmark( + model_spec: str | None, + min_scores: list[float], + max_hits_values: list[int], + batch_size: int, +) -> None: + """Run a single benchmark sweep and print the evaluated grid.""" + + load_dotenv() + + repo_root = Path(__file__).resolve().parent.parent + message_texts = load_message_texts(repo_root) + query_cases = load_search_queries(repo_root) + if not query_cases: + raise ValueError("No search queries with messageMatches found in the dataset") + model, vector_base = await build_vector_base( + model_spec, + message_texts, + batch_size, + ) + query_embeddings = await model.get_embeddings([case.query for case in query_cases]) + top_score_stats = measure_top_score_stats(vector_base, query_embeddings) + effective_min_scores, skipped_min_scores = filter_min_scores_by_ceiling( + min_scores, + top_score_stats.max_top_score, + ) + if not effective_min_scores: + raise ValueError( + "No requested min_score values are below the observed top-score ceiling " + f"of {top_score_stats.max_top_score:.4f}" + ) + + rows: list[BenchmarkRow] = [] + for min_score in effective_min_scores: + for max_hits in max_hits_values: + metrics = evaluate_search_queries( + vector_base, + query_cases, + query_embeddings, + min_score, + max_hits, + ) + rows.append(BenchmarkRow(min_score, max_hits, metrics)) + + print(f"Model: {model.model_name}") + print(f"Messages indexed: {len(message_texts)}") + print(f"Queries evaluated: {len(query_cases)}") + print( + "Observed top-1 score range: " + f"{top_score_stats.min_top_score:.4f}..{top_score_stats.max_top_score:.4f} " + f"(mean {top_score_stats.mean_top_score:.4f})" + ) + if skipped_min_scores: + print( + f"Skipped {len(skipped_min_scores)} min_score values above " + f"{top_score_stats.max_top_score:.4f}; they cannot return any matches." + ) + print() + print_rows(rows) + + best_row = select_best_row(rows) + print() + print("Best-scoring benchmark row:") + print(f" min_score={best_row.min_score:.2f}") + print(f" max_hits={best_row.max_hits}") + print(f" hit_rate={best_row.metrics.hit_rate:.2f}%") + print(f" mrr={best_row.metrics.mean_reciprocal_rank:.4f}") + + +def main() -> None: + """Parse CLI arguments and run the benchmark once.""" + + parser = argparse.ArgumentParser( + description="Benchmark retrieval settings for an embedding model." + ) + parser.add_argument( + "--model", + type=str, + default=None, + help="Provider and model name, e.g. 'openai:text-embedding-3-small'", + ) + parser.add_argument( + "--min-scores", + type=str, + default=None, + help="Comma-separated min_score values to test.", + ) + parser.add_argument( + "--min-score-start", + type=float, + default=None, + help="Inclusive start of a generated min_score range.", + ) + parser.add_argument( + "--min-score-stop", + type=float, + default=None, + help="Inclusive end of a generated min_score range.", + ) + parser.add_argument( + "--min-score-step", + type=float, + default=None, + help="Step size for a generated min_score range.", + ) + parser.add_argument( + "--max-hits", + type=str, + default=None, + help="Comma-separated max_hits values to test.", + ) + parser.add_argument( + "--batch-size", + type=int, + default=16, + help="Batch size used when building the index.", + ) + args = parser.parse_args() + if args.batch_size <= 0: + raise ValueError("--batch-size must be a positive integer") + + asyncio.run( + run_benchmark( + model_spec=args.model, + min_scores=resolve_min_scores( + args.min_scores, + args.min_score_start, + args.min_score_stop, + args.min_score_step, + ), + max_hits_values=parse_int_list(args.max_hits), + batch_size=args.batch_size, + ) + ) + + +if __name__ == "__main__": + main() diff --git a/tools/repeat_embedding_benchmarks.py b/tools/repeat_embedding_benchmarks.py new file mode 100644 index 0000000..bc2a741 --- /dev/null +++ b/tools/repeat_embedding_benchmarks.py @@ -0,0 +1,399 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Run embedding benchmarks repeatedly and save raw/summary JSON results. + +This script runs `tools/benchmark_embeddings.py` logic multiple times for each +embedding model, stores every run as JSON, and writes aggregate summaries that +can be used to justify tuned defaults. + +Usage: + uv run python tools/repeat_embedding_benchmarks.py + uv run python tools/repeat_embedding_benchmarks.py --runs 30 + uv run python tools/repeat_embedding_benchmarks.py --models openai:text-embedding-3-small,openai:text-embedding-3-large,openai:text-embedding-ada-002 + uv run python tools/repeat_embedding_benchmarks.py --models openai:text-embedding-3-small --min-score-start 0.01 --min-score-stop 0.20 --min-score-step 0.01 +""" + +import argparse +import asyncio +from dataclasses import asdict, dataclass +from datetime import datetime, UTC +import json +from pathlib import Path +from statistics import mean + +import benchmark_embeddings +from dotenv import load_dotenv + +BenchmarkRow = benchmark_embeddings.BenchmarkRow +DEFAULT_MAX_HITS = benchmark_embeddings.DEFAULT_MAX_HITS +parse_int_list = benchmark_embeddings.parse_int_list +resolve_min_scores = benchmark_embeddings.resolve_min_scores + +DEFAULT_MODELS = [ + "openai:text-embedding-3-small", + "openai:text-embedding-3-large", + "openai:text-embedding-ada-002", +] +DEFAULT_OUTPUT_DIR = Path("benchmark_results") + + +@dataclass +class RunRow: + """Serialized benchmark row for one repeated run.""" + + min_score: float + max_hits: int + hit_rate: float + mean_reciprocal_rank: float + + +@dataclass +class RunResult: + """All measurements captured for one benchmark repetition.""" + + run_index: int + model_spec: str + resolved_model_name: str + message_count: int + query_count: int + min_top_score: float + mean_top_score: float + max_top_score: float + rows: list[RunRow] + best_row: RunRow + + +def sanitize_model_name(model_spec: str) -> str: + """Convert a model spec into a filesystem-safe directory name.""" + + return model_spec.replace(":", "__").replace("/", "_").replace("\\", "_") + + +def benchmark_row_to_run_row(row: BenchmarkRow) -> RunRow: + """Flatten a benchmark row into the JSON-friendly repeated-run shape.""" + + return RunRow( + min_score=row.min_score, + max_hits=row.max_hits, + hit_rate=row.metrics.hit_rate, + mean_reciprocal_rank=row.metrics.mean_reciprocal_rank, + ) + + +def summarize_runs(model_spec: str, runs: list[RunResult]) -> dict[str, object]: + """Average repeated benchmark runs into a per-model summary payload.""" + + summary_rows: dict[tuple[float, int], list[RunRow]] = {} + for run in runs: + for row in run.rows: + summary_rows.setdefault((row.min_score, row.max_hits), []).append(row) + + averaged_rows: list[dict[str, float | int]] = [] + for (min_score, max_hits), rows in sorted(summary_rows.items()): + averaged_rows.append( + { + "min_score": min_score, + "max_hits": max_hits, + "mean_hit_rate": mean(row.hit_rate for row in rows), + "mean_mrr": mean(row.mean_reciprocal_rank for row in rows), + } + ) + + best_rows = [run.best_row for run in runs] + best_min_score_counts: dict[str, int] = {} + best_max_hits_counts: dict[str, int] = {} + for row in best_rows: + best_min_score_counts[f"{row.min_score:.2f}"] = ( + best_min_score_counts.get(f"{row.min_score:.2f}", 0) + 1 + ) + best_max_hits_counts[str(row.max_hits)] = ( + best_max_hits_counts.get(str(row.max_hits), 0) + 1 + ) + + averaged_best_row = max( + averaged_rows, + key=lambda row: ( + float(row["mean_mrr"]), + float(row["mean_hit_rate"]), + float(row["min_score"]), + -int(row["max_hits"]), + ), + ) + + return { + "model_spec": model_spec, + "resolved_model_name": runs[0].resolved_model_name, + "run_count": len(runs), + "message_count": runs[0].message_count, + "query_count": runs[0].query_count, + "min_top_score": mean(run.min_top_score for run in runs), + "mean_top_score": mean(run.mean_top_score for run in runs), + "max_top_score": mean(run.max_top_score for run in runs), + "candidate_rows": averaged_rows, + "recommended_row": averaged_best_row, + "best_min_score_counts": best_min_score_counts, + "best_max_hits_counts": best_max_hits_counts, + } + + +def write_json(path: Path, data: object) -> None: + """Write a JSON artifact with stable indentation for review and reuse.""" + + path.write_text(json.dumps(data, indent=2), encoding="utf-8") + + +def write_markdown_summary(path: Path, summaries: list[dict[str, object]]) -> None: + """Write the reviewer-facing markdown summary for all benchmarked models.""" + + lines = [ + "# Repeated Embedding Benchmark Summary", + "", + "| Model | Runs | Recommended min_score | Recommended max_hits | Mean hit rate | Mean MRR |", + "| --- | ---: | ---: | ---: | ---: | ---: |", + ] + for summary in summaries: + recommended_row = summary["recommended_row"] + assert isinstance(recommended_row, dict) + lines.append( + "| " + f"{summary['resolved_model_name']} | " + f"{summary['run_count']} | " + f"{recommended_row['min_score']:.2f} | " + f"{recommended_row['max_hits']} | " + f"{recommended_row['mean_hit_rate']:.2f} | " + f"{recommended_row['mean_mrr']:.4f} |" + ) + lines.append("") + for summary in summaries: + lines.append( + f"- {summary['resolved_model_name']}: observed top-1 score range " + f"{summary['min_top_score']:.4f}..{summary['max_top_score']:.4f} " + f"(mean {summary['mean_top_score']:.4f})." + ) + lines.append("") + path.write_text("\n".join(lines), encoding="utf-8") + + +async def run_single_model_benchmark( + model_spec: str, + runs: int, + min_scores: list[float], + max_hits_values: list[int], + batch_size: int, + output_dir: Path, +) -> dict[str, object]: + """Run the benchmark repeatedly for one model and persist raw artifacts.""" + + repo_root = Path(__file__).resolve().parent.parent + message_texts = benchmark_embeddings.load_message_texts(repo_root) + query_cases = benchmark_embeddings.load_search_queries(repo_root) + model_output_dir = output_dir / sanitize_model_name(model_spec) + model_output_dir.mkdir(parents=True, exist_ok=True) + + run_results: list[RunResult] = [] + for run_index in range(1, runs + 1): + model, vector_base = await benchmark_embeddings.build_vector_base( + model_spec, + message_texts, + batch_size, + ) + query_embeddings = await model.get_embeddings( + [case.query for case in query_cases] + ) + top_score_stats = benchmark_embeddings.measure_top_score_stats( + vector_base, + query_embeddings, + ) + effective_min_scores, skipped_min_scores = ( + benchmark_embeddings.filter_min_scores_by_ceiling( + min_scores, + top_score_stats.max_top_score, + ) + ) + if not effective_min_scores: + raise ValueError( + "No requested min_score values are below the observed top-score ceiling " + f"of {top_score_stats.max_top_score:.4f} for {model.model_name}" + ) + if skipped_min_scores: + print( + f"Skipping {len(skipped_min_scores)} min_score values above " + f"{top_score_stats.max_top_score:.4f} for {model.model_name}" + ) + benchmark_rows: list[benchmark_embeddings.BenchmarkRow] = [] + for min_score in effective_min_scores: + for max_hits in max_hits_values: + metrics = benchmark_embeddings.evaluate_search_queries( + vector_base, + query_cases, + query_embeddings, + min_score, + max_hits, + ) + benchmark_rows.append( + benchmark_embeddings.BenchmarkRow(min_score, max_hits, metrics) + ) + + best_row = benchmark_embeddings.select_best_row(benchmark_rows) + run_result = RunResult( + run_index=run_index, + model_spec=model_spec, + resolved_model_name=model.model_name, + message_count=len(message_texts), + query_count=len(query_cases), + min_top_score=top_score_stats.min_top_score, + mean_top_score=top_score_stats.mean_top_score, + max_top_score=top_score_stats.max_top_score, + rows=[benchmark_row_to_run_row(row) for row in benchmark_rows], + best_row=benchmark_row_to_run_row(best_row), + ) + run_results.append(run_result) + write_json(model_output_dir / f"run_{run_index:02d}.json", asdict(run_result)) + + summary = summarize_runs(model_spec, run_results) + write_json(model_output_dir / "summary.json", summary) + return summary + + +async def run_repeated_benchmarks( + models: list[str], + runs: int, + min_scores: list[float], + max_hits_values: list[int], + batch_size: int, + output_root: Path, +) -> Path: + """Run the benchmark suite for each requested model and save the artifacts.""" + + timestamp = datetime.now(UTC).strftime("%Y%m%dT%H%M%SZ") + output_dir = output_root / timestamp + output_dir.mkdir(parents=True, exist_ok=True) + + metadata = { + "created_at_utc": timestamp, + "runs_per_model": runs, + "models": models, + "min_scores": min_scores, + "max_hits_values": max_hits_values, + "batch_size": batch_size, + } + write_json(output_dir / "metadata.json", metadata) + + summaries: list[dict[str, object]] = [] + for model_spec in models: + print(f"Running {runs} benchmark iterations for {model_spec}...") + summary = await run_single_model_benchmark( + model_spec=model_spec, + runs=runs, + min_scores=min_scores, + max_hits_values=max_hits_values, + batch_size=batch_size, + output_dir=output_dir, + ) + summaries.append(summary) + + write_json(output_dir / "summary.json", summaries) + write_markdown_summary(output_dir / "summary.md", summaries) + return output_dir + + +def parse_models(raw: str | None) -> list[str]: + """Parse the model list or fall back to the built-in OpenAI benchmark set.""" + + if raw is None: + return DEFAULT_MODELS + models = [item.strip() for item in raw.split(",") if item.strip()] + if not models: + raise ValueError("--models must contain at least one model") + return models + + +def main() -> None: + """Parse CLI arguments and run repeated embedding benchmarks.""" + + parser = argparse.ArgumentParser( + description="Run embedding benchmarks repeatedly and save JSON results." + ) + parser.add_argument( + "--models", + type=str, + default=None, + help="Comma-separated model specs to benchmark.", + ) + parser.add_argument( + "--runs", + type=int, + default=30, + help="Number of repeated runs per model.", + ) + parser.add_argument( + "--min-scores", + type=str, + default=None, + help="Comma-separated min_score values to test.", + ) + parser.add_argument( + "--min-score-start", + type=float, + default=None, + help="Inclusive start of a generated min_score range.", + ) + parser.add_argument( + "--min-score-stop", + type=float, + default=None, + help="Inclusive end of a generated min_score range.", + ) + parser.add_argument( + "--min-score-step", + type=float, + default=None, + help="Step size for a generated min_score range.", + ) + parser.add_argument( + "--max-hits", + type=str, + default=",".join(str(value) for value in DEFAULT_MAX_HITS), + help="Comma-separated max_hits values to test.", + ) + parser.add_argument( + "--batch-size", + type=int, + default=16, + help="Batch size used when building the index.", + ) + parser.add_argument( + "--output-dir", + type=str, + default=str(DEFAULT_OUTPUT_DIR), + help="Directory where benchmark results will be written.", + ) + args = parser.parse_args() + + if args.runs <= 0: + raise ValueError("--runs must be a positive integer") + if args.batch_size <= 0: + raise ValueError("--batch-size must be a positive integer") + + load_dotenv() + output_dir = asyncio.run( + run_repeated_benchmarks( + models=parse_models(args.models), + runs=args.runs, + min_scores=resolve_min_scores( + args.min_scores, + args.min_score_start, + args.min_score_stop, + args.min_score_step, + ), + max_hits_values=parse_int_list(args.max_hits), + batch_size=args.batch_size, + output_root=Path(args.output_dir), + ) + ) + print(f"Wrote benchmark results to {output_dir}") + + +if __name__ == "__main__": + main()