From 7053050adb22b5d004a492bae6f2ecb756191e93 Mon Sep 17 00:00:00 2001 From: AJ Roetker Date: Sat, 22 Nov 2025 23:15:25 +0000 Subject: [PATCH 1/2] (feat) Add field collapse support for search result deduplication This implements Elasticsearch-style field collapsing to deduplicate search results based on a field value. This is useful for scenarios like document chunking where multiple indexed chunks should be represented by only the top-scoring chunk per document. - Collapse results by any stored field with doc values - Returns highest-scoring document per unique field value - Null/missing values grouped together --- index_impl.go | 5 + search.go | 15 +++ search/collector/topn.go | 81 +++++++++++++++- search/collector/topn_test.go | 172 ++++++++++++++++++++++++++++++++++ search_knn.go | 4 + search_no_knn.go | 4 + 6 files changed, 279 insertions(+), 2 deletions(-) diff --git a/index_impl.go b/index_impl.go index a43b3cf75..9bc8770c6 100644 --- a/index_impl.go +++ b/index_impl.go @@ -674,6 +674,11 @@ func (i *indexImpl) SearchInContext(ctx context.Context, req *SearchRequest) (sr coll = collector.NewTopNCollector(req.Size, req.From, req.Sort) } + // Enable field collapsing if requested + if req.Collapse != nil && req.Collapse.Field != "" { + coll.SetCollapse(req.Collapse.Field) + } + var knnHits []*search.DocumentMatch var skipKNNCollector bool diff --git a/search.go b/search.go index e3736558a..bf28a138d 100644 --- a/search.go +++ b/search.go @@ -141,6 +141,21 @@ type numericRange struct { Max *float64 `json:"max,omitempty"` } +// CollapseRequest describes field collapsing for search results. +// Collapses search results based on a field value, returning only +// the top-scoring document per unique field value. +// Similar to Elasticsearch's collapse feature. +type CollapseRequest struct { + Field string `json:"field"` +} + +// NewCollapseRequest creates a new collapse request for the specified field. +func NewCollapseRequest(field string) *CollapseRequest { + return &CollapseRequest{ + Field: field, + } +} + // A FacetRequest describes a facet or aggregation // of the result document set you would like to be // built. diff --git a/search/collector/topn.go b/search/collector/topn.go index 739dd8348..0114230e9 100644 --- a/search/collector/topn.go +++ b/search/collector/topn.go @@ -79,6 +79,11 @@ type TopNCollector struct { knnHits map[string]*search.DocumentMatch computeNewScoreExpl search.ScoreExplCorrectionCallbackFunc + + // Collapse support + collapseField string + collapseGroups map[string]*search.DocumentMatch // field_value -> best doc for that value + collapseFieldValue string // current document's collapse field value } // CheckDoneEvery controls how frequently we check the context deadline @@ -262,6 +267,10 @@ func (hc *TopNCollector) Collect(ctx context.Context, searcher search.Searcher, hc.facetsBuilder.UpdateVisitor(field, term) } hc.sort.UpdateVisitor(field, term) + // Capture collapse field value + if hc.collapseField != "" && field == hc.collapseField { + hc.collapseFieldValue = string(term) + } } dmHandlerMaker := MakeTopNDocumentMatchHandler @@ -376,6 +385,9 @@ func (hc *TopNCollector) adjustDocumentMatch(ctx *search.SearchContext, func (hc *TopNCollector) prepareDocumentMatch(ctx *search.SearchContext, reader index.IndexReader, d *search.DocumentMatch, isKnnDoc bool) (err error) { + // Reset collapse field value for each document + hc.collapseFieldValue = "" + // visit field terms for features that require it (sort, facets) if !isKnnDoc && len(hc.neededFields) > 0 { err = hc.visitFieldTerms(reader, d, hc.updateFieldVisitor) @@ -465,6 +477,29 @@ func MakeTopNDocumentMatchHandler( } } + // Handle field collapsing - deduplicate by field value + // Only proceed if this is the best document for its collapse key so far + if hc.collapseField != "" { + collapseKey := hc.collapseFieldValue + // Following Elasticsearch behavior: treat missing values as empty string + // and group them together + if existingDoc, seen := hc.collapseGroups[collapseKey]; seen { + // We've seen this collapse key before, check if new doc is better + cmp := hc.sort.Compare(hc.cachedScoring, hc.cachedDesc, d, existingDoc) + if cmp >= 0 { + // Existing doc is better or equal, skip this one + ctx.DocumentMatchPool.Put(d) + return nil + } + // New doc is better, update the tracking map + // The old doc will eventually fall out of the top N naturally + hc.collapseGroups[collapseKey] = d + } else { + // First time seeing this collapse key, track it + hc.collapseGroups[collapseKey] = d + } + } + removed := hc.store.AddNotExceedingSize(d, hc.size+hc.skip) if removed != nil { if hc.lowestMatchOutsideResults == nil { @@ -535,7 +570,7 @@ func (hc *TopNCollector) SetFacetsBuilder(facetsBuilder *search.FacetsBuilder) { // and does final doc id lookup (if necessary) func (hc *TopNCollector) finalizeResults(r index.IndexReader) error { var err error - hc.results, err = hc.store.Final(hc.skip, func(doc *search.DocumentMatch) error { + results, err := hc.store.Final(hc.skip, func(doc *search.DocumentMatch) error { if doc.ID == "" { // look up the id since we need it for lookup var err error @@ -547,8 +582,31 @@ func (hc *TopNCollector) finalizeResults(r index.IndexReader) error { doc.Complete(nil) return nil }) + if err != nil { + return err + } - return err + // If collapse is enabled, filter results to only include the best document per collapse key + if hc.collapseField != "" { + // Create a set of documents that are in collapseGroups (i.e., current best for their key) + bestDocs := make(map[*search.DocumentMatch]bool) + for _, doc := range hc.collapseGroups { + bestDocs[doc] = true + } + + // Filter results to only include documents in bestDocs + filtered := make(search.DocumentMatchCollection, 0, len(results)) + for _, doc := range results { + if bestDocs[doc] { + filtered = append(filtered, doc) + } + } + hc.results = filtered + } else { + hc.results = results + } + + return nil } // Results returns the collected hits @@ -586,3 +644,22 @@ func (hc *TopNCollector) SetKNNHits(knnHits search.DocumentMatchCollection, newS } hc.computeNewScoreExpl = newScoreExplComputer } + +// SetCollapse enables field collapsing on the collector. +// The collapse field must be a stored field with doc values. +func (hc *TopNCollector) SetCollapse(field string) { + hc.collapseField = field + hc.collapseGroups = make(map[string]*search.DocumentMatch) + + // Add collapse field to needed fields for doc value reading + found := false + for _, f := range hc.neededFields { + if f == field { + found = true + break + } + } + if !found { + hc.neededFields = append(hc.neededFields, field) + } +} diff --git a/search/collector/topn_test.go b/search/collector/topn_test.go index db3904388..1f3ce1117 100644 --- a/search/collector/topn_test.go +++ b/search/collector/topn_test.go @@ -1114,3 +1114,175 @@ func BenchmarkTop10000of1000000Scores(b *testing.B) { return NewTopNCollector(10000, 0, search.SortOrder{&search.SortScore{Desc: true}}) }, b) } + +// Test field collapsing - deduplicate results by field value +func TestCollapse(t *testing.T) { + // Create test data: multiple chunks per document + // doc1: chunks a (score 10), b (score 20), c (score 5) + // doc2: chunks d (score 15), e (score 8) + // doc3: chunk f (score 25) + // With collapse on document_id, we should get: + // - doc1 represented by chunk b (score 20) + // - doc2 represented by chunk d (score 15) + // - doc3 represented by chunk f (score 25) + // Sorted by score desc: f(25), b(20), d(15) + + fieldValues := map[string]string{ + "a": "doc1", + "b": "doc1", + "c": "doc1", + "d": "doc2", + "e": "doc2", + "f": "doc3", + } + + searcher := &stubSearcher{ + matches: []*search.DocumentMatch{ + {IndexInternalID: index.IndexInternalID("a"), Score: 10}, + {IndexInternalID: index.IndexInternalID("b"), Score: 20}, + {IndexInternalID: index.IndexInternalID("c"), Score: 5}, + {IndexInternalID: index.IndexInternalID("d"), Score: 15}, + {IndexInternalID: index.IndexInternalID("e"), Score: 8}, + {IndexInternalID: index.IndexInternalID("f"), Score: 25}, + }, + } + + reader := &stubReaderWithCollapse{ + fieldValues: fieldValues, + fieldName: "document_id", + } + + collector := NewTopNCollector(10, 0, search.SortOrder{&search.SortScore{Desc: true}}) + collector.SetCollapse("document_id") + + err := collector.Collect(context.Background(), searcher, reader) + if err != nil { + t.Fatal(err) + } + + total := collector.Total() + if total != 6 { + t.Errorf("expected 6 total hits, got %d", total) + } + + results := collector.Results() + + // Should get 3 results (one per unique document_id) + if len(results) != 3 { + t.Fatalf("expected 3 results after collapse, got %d", len(results)) + } + + // Check that we got the highest scoring chunk per document + // Result 0: f (doc3, score 25) + if results[0].ID != "f" { + t.Errorf("expected first result ID 'f', got %s", results[0].ID) + } + if results[0].Score != 25 { + t.Errorf("expected first result score 25, got %f", results[0].Score) + } + + // Result 1: b (doc1, score 20) + if results[1].ID != "b" { + t.Errorf("expected second result ID 'b', got %s", results[1].ID) + } + if results[1].Score != 20 { + t.Errorf("expected second result score 20, got %f", results[1].Score) + } + + // Result 2: d (doc2, score 15) + if results[2].ID != "d" { + t.Errorf("expected third result ID 'd', got %s", results[2].ID) + } + if results[2].Score != 15 { + t.Errorf("expected third result score 15, got %f", results[2].Score) + } +} + +// Test collapse with missing field values +func TestCollapseWithMissingValues(t *testing.T) { + // doc1: chunk a (score 10) + // doc2: chunk b (score 20) + // no doc_id: chunks c (score 15), d (score 8) + // With collapse, missing values should be grouped together + + fieldValues := map[string]string{ + "a": "doc1", + "b": "doc2", + // c and d have no document_id field + } + + searcher := &stubSearcher{ + matches: []*search.DocumentMatch{ + {IndexInternalID: index.IndexInternalID("a"), Score: 10}, + {IndexInternalID: index.IndexInternalID("b"), Score: 20}, + {IndexInternalID: index.IndexInternalID("c"), Score: 15}, + {IndexInternalID: index.IndexInternalID("d"), Score: 8}, + }, + } + + reader := &stubReaderWithCollapse{ + fieldValues: fieldValues, + fieldName: "document_id", + } + + collector := NewTopNCollector(10, 0, search.SortOrder{&search.SortScore{Desc: true}}) + collector.SetCollapse("document_id") + + err := collector.Collect(context.Background(), searcher, reader) + if err != nil { + t.Fatal(err) + } + + results := collector.Results() + + // Should get 3 results: + // - doc2 (b, score 20) + // - missing group (c, score 15) + // - doc1 (a, score 10) + if len(results) != 3 { + t.Fatalf("expected 3 results after collapse, got %d", len(results)) + } + + // Verify scores are in descending order + if results[0].Score != 20 || results[1].Score != 15 || results[2].Score != 10 { + t.Errorf("results not properly sorted: scores %f, %f, %f", + results[0].Score, results[1].Score, results[2].Score) + } +} + +// stubReaderWithCollapse is a stub reader that can return collapse field values +type stubReaderWithCollapse struct { + stubReader + fieldValues map[string]string + fieldName string +} + +func (sr *stubReaderWithCollapse) DocumentVisitFieldTerms(id index.IndexInternalID, fields []string, visitor index.DocValueVisitor) error { + idStr := string(id) + for _, field := range fields { + if field == sr.fieldName { + if value, ok := sr.fieldValues[idStr]; ok { + visitor(field, []byte(value)) + } + // If not in map, field is missing (empty value will be used) + } + } + return nil +} + +func (sr *stubReaderWithCollapse) DocValueReader(fields []string) (index.DocValueReader, error) { + return &collapseDocValueReader{sr: sr, fields: fields}, nil +} + +type collapseDocValueReader struct { + sr *stubReaderWithCollapse + fields []string +} + +func (dvr *collapseDocValueReader) VisitDocValues(id index.IndexInternalID, visitor index.DocValueVisitor) error { + return dvr.sr.DocumentVisitFieldTerms(id, dvr.fields, visitor) +} + +func (dvr *collapseDocValueReader) BytesRead() uint64 { + return 0 +} diff --git a/search_knn.go b/search_knn.go index 73be6f5d5..02078bbe3 100644 --- a/search_knn.go +++ b/search_knn.go @@ -51,6 +51,7 @@ type SearchRequest struct { Score string `json:"score,omitempty"` SearchAfter []string `json:"search_after"` SearchBefore []string `json:"search_before"` + Collapse *CollapseRequest `json:"collapse,omitempty"` KNN []*KNNRequest `json:"knn"` KNNOperator knnOperator `json:"knn_operator"` @@ -147,6 +148,7 @@ func (r *SearchRequest) UnmarshalJSON(input []byte) error { Score string `json:"score"` SearchAfter []string `json:"search_after"` SearchBefore []string `json:"search_before"` + Collapse *CollapseRequest `json:"collapse"` KNN []*tempKNNReq `json:"knn"` KNNOperator knnOperator `json:"knn_operator"` PreSearchData json.RawMessage `json:"pre_search_data"` @@ -180,6 +182,7 @@ func (r *SearchRequest) UnmarshalJSON(input []byte) error { r.Score = temp.Score r.SearchAfter = temp.SearchAfter r.SearchBefore = temp.SearchBefore + r.Collapse = temp.Collapse r.Query, err = query.ParseQuery(temp.Q) if err != nil { return err @@ -259,6 +262,7 @@ func copySearchRequest(req *SearchRequest, preSearchData map[string]interface{}) Score: req.Score, SearchAfter: req.SearchAfter, SearchBefore: req.SearchBefore, + Collapse: req.Collapse, KNN: req.KNN, KNNOperator: req.KNNOperator, PreSearchData: preSearchData, diff --git a/search_no_knn.go b/search_no_knn.go index 172f258ec..0c13d86a3 100644 --- a/search_no_knn.go +++ b/search_no_knn.go @@ -64,6 +64,7 @@ type SearchRequest struct { Score string `json:"score,omitempty"` SearchAfter []string `json:"search_after"` SearchBefore []string `json:"search_before"` + Collapse *CollapseRequest `json:"collapse,omitempty"` // PreSearchData will be a map that will be used // in the second phase of any 2-phase search, to provide additional @@ -98,6 +99,7 @@ func (r *SearchRequest) UnmarshalJSON(input []byte) error { Score string `json:"score"` SearchAfter []string `json:"search_after"` SearchBefore []string `json:"search_before"` + Collapse *CollapseRequest `json:"collapse"` PreSearchData json.RawMessage `json:"pre_search_data"` Params json.RawMessage `json:"params"` } @@ -129,6 +131,7 @@ func (r *SearchRequest) UnmarshalJSON(input []byte) error { r.Score = temp.Score r.SearchAfter = temp.SearchAfter r.SearchBefore = temp.SearchBefore + r.Collapse = temp.Collapse r.Query, err = query.ParseQuery(temp.Q) if err != nil { return err @@ -184,6 +187,7 @@ func copySearchRequest(req *SearchRequest, preSearchData map[string]interface{}) Score: req.Score, SearchAfter: req.SearchAfter, SearchBefore: req.SearchBefore, + Collapse: req.Collapse, PreSearchData: preSearchData, } return &rv From 3c51a8a258a28c7b9e516d6b4666a756f33ab7b4 Mon Sep 17 00:00:00 2001 From: AJ Roetker Date: Mon, 24 Nov 2025 14:43:01 -0800 Subject: [PATCH 2/2] (feat) Add Elasticsearch-compatible field collapse with inner_hits support Enables document chunking workflows where each document is split into multiple searchable chunks, but you want to deduplicate results while still showing multiple relevant chunks per document. - Returns one representative per unique field value (e.g., document_id) - InnerHits retrieve additional documents from each group with independent sorting (e.g., rank groups by relevance, show most recent chunks within each group) - Multiple named inner_hits configurations per collapse - Pagination (from/size) within groups - Second-level collapse for hierarchical grouping Example use cases: - RAG systems: Return top documents, show multiple relevant chunks per document - E-commerce: Group product variants, show different colors/sizes - Search results: Deduplicate by domain, show multiple pages per site Implements hierarchical response structure matching Elasticsearch's collapse feature with full inner_hits support. --- index_impl.go | 2 +- search.go | 15 --- search/collector/topn.go | 231 +++++++++++++++++++++++++++------- search/collector/topn_test.go | 193 +++++++++++++++++++++++++++- search/search.go | 56 +++++++++ search_knn.go | 2 +- search_no_knn.go | 14 +-- 7 files changed, 445 insertions(+), 68 deletions(-) diff --git a/index_impl.go b/index_impl.go index 9bc8770c6..fa4fdc854 100644 --- a/index_impl.go +++ b/index_impl.go @@ -676,7 +676,7 @@ func (i *indexImpl) SearchInContext(ctx context.Context, req *SearchRequest) (sr // Enable field collapsing if requested if req.Collapse != nil && req.Collapse.Field != "" { - coll.SetCollapse(req.Collapse.Field) + coll.SetCollapse(req.Collapse) } var knnHits []*search.DocumentMatch diff --git a/search.go b/search.go index bf28a138d..e3736558a 100644 --- a/search.go +++ b/search.go @@ -141,21 +141,6 @@ type numericRange struct { Max *float64 `json:"max,omitempty"` } -// CollapseRequest describes field collapsing for search results. -// Collapses search results based on a field value, returning only -// the top-scoring document per unique field value. -// Similar to Elasticsearch's collapse feature. -type CollapseRequest struct { - Field string `json:"field"` -} - -// NewCollapseRequest creates a new collapse request for the specified field. -func NewCollapseRequest(field string) *CollapseRequest { - return &CollapseRequest{ - Field: field, - } -} - // A FacetRequest describes a facet or aggregation // of the result document set you would like to be // built. diff --git a/search/collector/topn.go b/search/collector/topn.go index 0114230e9..eab9aa2c2 100644 --- a/search/collector/topn.go +++ b/search/collector/topn.go @@ -17,6 +17,7 @@ package collector import ( "context" "reflect" + "sort" "strconv" "time" @@ -53,6 +54,12 @@ type collectorCompare func(i, j *search.DocumentMatch) int type collectorFixup func(d *search.DocumentMatch) error +// collapseGroup tracks documents for a single collapsed group +type collapseGroup struct { + representative *search.DocumentMatch // the representative document (selected by main sort) + innerHits map[string][]*search.DocumentMatch // inner_hits name -> documents +} + // TopNCollector collects the top N hits, optionally skipping some results type TopNCollector struct { size int @@ -80,10 +87,18 @@ type TopNCollector struct { knnHits map[string]*search.DocumentMatch computeNewScoreExpl search.ScoreExplCorrectionCallbackFunc - // Collapse support - collapseField string - collapseGroups map[string]*search.DocumentMatch // field_value -> best doc for that value - collapseFieldValue string // current document's collapse field value + // Collapse support - new architecture supporting inner_hits + collapseRequest *search.CollapseRequest // full collapse configuration + collapseGroups map[string]*collapseGroup // field_value -> group state + collapseFieldValue string // current document's collapse field value + collapseInnerHitsSorts map[string]*sortContext // inner_hits name -> cached sort context +} + +// sortContext holds cached sort comparison data for a specific sort order +type sortContext struct { + sort search.SortOrder + scoring []bool + desc []bool } // CheckDoneEvery controls how frequently we check the context deadline @@ -268,7 +283,7 @@ func (hc *TopNCollector) Collect(ctx context.Context, searcher search.Searcher, } hc.sort.UpdateVisitor(field, term) // Capture collapse field value - if hc.collapseField != "" && field == hc.collapseField { + if hc.collapseRequest != nil && field == hc.collapseRequest.Field { hc.collapseFieldValue = string(term) } } @@ -477,26 +492,21 @@ func MakeTopNDocumentMatchHandler( } } - // Handle field collapsing - deduplicate by field value - // Only proceed if this is the best document for its collapse key so far - if hc.collapseField != "" { + // Handle field collapsing + if hc.collapseRequest != nil { collapseKey := hc.collapseFieldValue // Following Elasticsearch behavior: treat missing values as empty string // and group them together - if existingDoc, seen := hc.collapseGroups[collapseKey]; seen { - // We've seen this collapse key before, check if new doc is better - cmp := hc.sort.Compare(hc.cachedScoring, hc.cachedDesc, d, existingDoc) - if cmp >= 0 { - // Existing doc is better or equal, skip this one - ctx.DocumentMatchPool.Put(d) - return nil - } - // New doc is better, update the tracking map - // The old doc will eventually fall out of the top N naturally - hc.collapseGroups[collapseKey] = d - } else { - // First time seeing this collapse key, track it - hc.collapseGroups[collapseKey] = d + + group := hc.collapseGroups[collapseKey] + if group == nil { + group = &collapseGroup{} + hc.collapseGroups[collapseKey] = group + } + + // Maintain representative + per-inner_hits lists + if !hc.handleInnerHitsCollapse(group, d, ctx) { + return nil // Document was rejected } } @@ -586,27 +596,77 @@ func (hc *TopNCollector) finalizeResults(r index.IndexReader) error { return err } - // If collapse is enabled, filter results to only include the best document per collapse key - if hc.collapseField != "" { - // Create a set of documents that are in collapseGroups (i.e., current best for their key) - bestDocs := make(map[*search.DocumentMatch]bool) - for _, doc := range hc.collapseGroups { - bestDocs[doc] = true + // If collapse is enabled, build hierarchical response + if hc.collapseRequest != nil { + hc.results = hc.buildInnerHitsResponse(results) + } else { + hc.results = results + } + + return nil +} + +// buildInnerHitsResponse builds hierarchical response with inner_hits nested under representatives +func (hc *TopNCollector) buildInnerHitsResponse(results search.DocumentMatchCollection) search.DocumentMatchCollection { + // Create a set of representative documents + representatives := make(map[*search.DocumentMatch]*collapseGroup) + for _, group := range hc.collapseGroups { + if group.representative != nil { + representatives[group.representative] = group } + } + + // Filter results to only include representatives + filtered := make(search.DocumentMatchCollection, 0, len(representatives)) + for _, doc := range results { + if group, isRep := representatives[doc]; isRep { + // Attach inner_hits to this representative + doc.InnerHits = make(map[string]*search.InnerHitsResult) + + for _, ihReq := range hc.collapseRequest.InnerHits { + docs := group.innerHits[ihReq.Name] + if docs == nil { + docs = []*search.DocumentMatch{} + } + + // Apply pagination (from/size) + from := ihReq.From + size := ihReq.Size + if size < 1 { + size = 3 // Elasticsearch default + } + + total := uint64(len(docs)) - // Filter results to only include documents in bestDocs - filtered := make(search.DocumentMatchCollection, 0, len(results)) - for _, doc := range results { - if bestDocs[doc] { - filtered = append(filtered, doc) + // Sort the docs according to this inner_hits' sort order + if sortCtx := hc.collapseInnerHitsSorts[ihReq.Name]; sortCtx != nil { + // Use sort.Slice with the sort order's Compare method + sort.Slice(docs, func(i, j int) bool { + cmp := sortCtx.sort.Compare(sortCtx.scoring, sortCtx.desc, docs[i], docs[j]) + return cmp < 0 + }) + } + + // Apply from/size pagination + if from >= len(docs) { + docs = []*search.DocumentMatch{} + } else if from+size > len(docs) { + docs = docs[from:] + } else { + docs = docs[from : from+size] + } + + doc.InnerHits[ihReq.Name] = &search.InnerHitsResult{ + Hits: docs, + Total: total, + } } + + filtered = append(filtered, doc) } - hc.results = filtered - } else { - hc.results = results } - return nil + return filtered } // Results returns the collected hits @@ -647,19 +707,106 @@ func (hc *TopNCollector) SetKNNHits(knnHits search.DocumentMatchCollection, newS // SetCollapse enables field collapsing on the collector. // The collapse field must be a stored field with doc values. -func (hc *TopNCollector) SetCollapse(field string) { - hc.collapseField = field - hc.collapseGroups = make(map[string]*search.DocumentMatch) +func (hc *TopNCollector) SetCollapse(collapseReq *search.CollapseRequest) { + if collapseReq == nil { + return + } + + hc.collapseRequest = collapseReq + hc.collapseGroups = make(map[string]*collapseGroup) // Add collapse field to needed fields for doc value reading found := false for _, f := range hc.neededFields { - if f == field { + if f == collapseReq.Field { found = true break } } if !found { - hc.neededFields = append(hc.neededFields, field) + hc.neededFields = append(hc.neededFields, collapseReq.Field) + } + + // If inner_hits are configured, set up sort contexts for each + if collapseReq.InnerHits != nil && len(collapseReq.InnerHits) > 0 { + hc.collapseInnerHitsSorts = make(map[string]*sortContext) + for _, ih := range collapseReq.InnerHits { + sortOrder := ih.Sort + if sortOrder == nil { + // Default to main sort order if not specified + sortOrder = hc.sort + } + scoring := sortOrder.CacheIsScore() + desc := sortOrder.CacheDescending() + hc.collapseInnerHitsSorts[ih.Name] = &sortContext{ + sort: sortOrder, + scoring: scoring, + desc: desc, + } + } + } +} + +// handleInnerHitsCollapse handles collapse with inner_hits configurations +// Returns false if document should be rejected (not in any list) +func (hc *TopNCollector) handleInnerHitsCollapse(group *collapseGroup, d *search.DocumentMatch, ctx *search.SearchContext) bool { + // First, check if this should be the new representative (based on main sort) + if group.representative == nil { + group.representative = d + } else { + cmp := hc.sort.Compare(hc.cachedScoring, hc.cachedDesc, d, group.representative) + if cmp < 0 { + // New doc is better, make it the representative + // Old representative might still be in inner_hits lists + group.representative = d + } } + + // Initialize inner_hits map if needed + if group.innerHits == nil { + group.innerHits = make(map[string][]*search.DocumentMatch) + } + + // Try to add to each inner_hits list (using their independent sort orders) + addedToAny := false + for _, ihReq := range hc.collapseRequest.InnerHits { + sortCtx := hc.collapseInnerHitsSorts[ihReq.Name] + docs := group.innerHits[ihReq.Name] + + size := ihReq.Size + if size < 1 { + size = 3 // Elasticsearch default + } + + if len(docs) < size { + // List not full yet, add document + group.innerHits[ihReq.Name] = append(docs, d) + addedToAny = true + } else { + // List is full, find worst document according to this inner_hits' sort order + worstIdx := 0 + for i := 1; i < len(docs); i++ { + cmp := sortCtx.sort.Compare(sortCtx.scoring, sortCtx.desc, docs[i], docs[worstIdx]) + if cmp > 0 { + worstIdx = i + } + } + + // Compare new document with worst + cmp := sortCtx.sort.Compare(sortCtx.scoring, sortCtx.desc, d, docs[worstIdx]) + if cmp < 0 { + // New doc is better, replace worst + docs[worstIdx] = d + addedToAny = true + } + } + } + + // Only keep document if it's the representative or in at least one inner_hits list + if d != group.representative && !addedToAny { + ctx.DocumentMatchPool.Put(d) + return false + } + + return true } diff --git a/search/collector/topn_test.go b/search/collector/topn_test.go index 1f3ce1117..56d231f2f 100644 --- a/search/collector/topn_test.go +++ b/search/collector/topn_test.go @@ -1153,7 +1153,8 @@ func TestCollapse(t *testing.T) { } collector := NewTopNCollector(10, 0, search.SortOrder{&search.SortScore{Desc: true}}) - collector.SetCollapse("document_id") + collapseReq := &search.CollapseRequest{Field: "document_id"} + collector.SetCollapse(collapseReq) err := collector.Collect(context.Background(), searcher, reader) if err != nil { @@ -1226,7 +1227,8 @@ func TestCollapseWithMissingValues(t *testing.T) { } collector := NewTopNCollector(10, 0, search.SortOrder{&search.SortScore{Desc: true}}) - collector.SetCollapse("document_id") + collapseReq := &search.CollapseRequest{Field: "document_id"} + collector.SetCollapse(collapseReq) err := collector.Collect(context.Background(), searcher, reader) if err != nil { @@ -1250,6 +1252,193 @@ func TestCollapseWithMissingValues(t *testing.T) { } } +// Test collapse with size > 1 (inner_hits style) +func TestCollapseWithSize(t *testing.T) { + // Create test data: multiple chunks per document + // doc1: chunks a (score 10), b (score 20), c (score 5) + // doc2: chunks d (score 15), e (score 25), f (score 8) + // doc3: chunk g (score 18) + // With collapse size=2, we should get top 2 chunks per document: + // - doc1: b (20), a (10) + // - doc2: e (25), d (15) + // - doc3: g (18) [only has one chunk] + // Total: 5 results, sorted by score: e(25), b(20), g(18), d(15), a(10) + + fieldValues := map[string]string{ + "a": "doc1", + "b": "doc1", + "c": "doc1", + "d": "doc2", + "e": "doc2", + "f": "doc2", + "g": "doc3", + } + + searcher := &stubSearcher{ + matches: []*search.DocumentMatch{ + {IndexInternalID: index.IndexInternalID("a"), Score: 10}, + {IndexInternalID: index.IndexInternalID("b"), Score: 20}, + {IndexInternalID: index.IndexInternalID("c"), Score: 5}, + {IndexInternalID: index.IndexInternalID("d"), Score: 15}, + {IndexInternalID: index.IndexInternalID("e"), Score: 25}, + {IndexInternalID: index.IndexInternalID("f"), Score: 8}, + {IndexInternalID: index.IndexInternalID("g"), Score: 18}, + }, + } + + reader := &stubReaderWithCollapse{ + fieldValues: fieldValues, + fieldName: "document_id", + } + + collector := NewTopNCollector(10, 0, search.SortOrder{&search.SortScore{Desc: true}}) + innerHits := search.NewInnerHitsRequest("top") + innerHits.Size = 2 + collapseReq := &search.CollapseRequest{ + Field: "document_id", + InnerHits: []*search.InnerHitsRequest{innerHits}, + } + collector.SetCollapse(collapseReq) + + err := collector.Collect(context.Background(), searcher, reader) + if err != nil { + t.Fatal(err) + } + + total := collector.Total() + if total != 7 { + t.Errorf("expected 7 total hits, got %d", total) + } + + results := collector.Results() + + // Should get 3 representatives (one per document) + if len(results) != 3 { + t.Fatalf("expected 3 representatives after collapse, got %d", len(results)) + } + + // Verify representatives are sorted by score + // Result 0: e (doc2, score 25) + if results[0].ID != "e" { + t.Errorf("expected first representative 'e', got %s", results[0].ID) + } + if results[0].Score != 25 { + t.Errorf("expected first representative score 25, got %f", results[0].Score) + } + + // Check inner_hits for doc2 - should have 2 docs (e, d) + if results[0].InnerHits == nil || results[0].InnerHits["top"] == nil { + t.Fatal("expected inner_hits for doc2") + } + if len(results[0].InnerHits["top"].Hits) != 2 { + t.Errorf("expected 2 inner hits for doc2, got %d", len(results[0].InnerHits["top"].Hits)) + } + + // Result 1: b (doc1, score 20) + if results[1].ID != "b" { + t.Errorf("expected second representative 'b', got %s", results[1].ID) + } + + // Check inner_hits for doc1 - should have 2 docs (b, a) + if results[1].InnerHits == nil || results[1].InnerHits["top"] == nil { + t.Fatal("expected inner_hits for doc1") + } + if len(results[1].InnerHits["top"].Hits) != 2 { + t.Errorf("expected 2 inner hits for doc1, got %d", len(results[1].InnerHits["top"].Hits)) + } + + // Result 2: g (doc3, score 18) + if results[2].ID != "g" { + t.Errorf("expected third representative 'g', got %s", results[2].ID) + } + + // Check inner_hits for doc3 - should have 1 doc (g) + if results[2].InnerHits == nil || results[2].InnerHits["top"] == nil { + t.Fatal("expected inner_hits for doc3") + } + if len(results[2].InnerHits["top"].Hits) != 1 { + t.Errorf("expected 1 inner hit for doc3, got %d", len(results[2].InnerHits["top"].Hits)) + } +} + +// Test collapse with size=3 where one group has fewer docs +func TestCollapseWithSizeLargerThanGroup(t *testing.T) { + // doc1: chunks a (score 10), b (score 20) + // doc2: chunk c (score 15) + // With collapse size=3, we should get all available chunks: + // - doc1: b (20), a (10) [only has 2, not 3] + // - doc2: c (15) [only has 1] + // Total: 3 results + + fieldValues := map[string]string{ + "a": "doc1", + "b": "doc1", + "c": "doc2", + } + + searcher := &stubSearcher{ + matches: []*search.DocumentMatch{ + {IndexInternalID: index.IndexInternalID("a"), Score: 10}, + {IndexInternalID: index.IndexInternalID("b"), Score: 20}, + {IndexInternalID: index.IndexInternalID("c"), Score: 15}, + }, + } + + reader := &stubReaderWithCollapse{ + fieldValues: fieldValues, + fieldName: "document_id", + } + + collector := NewTopNCollector(10, 0, search.SortOrder{&search.SortScore{Desc: true}}) + innerHits := search.NewInnerHitsRequest("top") + innerHits.Size = 3 + collapseReq := &search.CollapseRequest{ + Field: "document_id", + InnerHits: []*search.InnerHitsRequest{innerHits}, + } + collector.SetCollapse(collapseReq) + + err := collector.Collect(context.Background(), searcher, reader) + if err != nil { + t.Fatal(err) + } + + results := collector.Results() + + // Should get 2 representatives (one per document) + if len(results) != 2 { + t.Fatalf("expected 2 representatives, got %d", len(results)) + } + + // Result 0: b (doc1, score 20) - highest overall + if results[0].ID != "b" || results[0].Score != 20 { + t.Errorf("expected first representative 'b' with score 20, got '%s' with score %f", + results[0].ID, results[0].Score) + } + + // Check inner_hits for doc1 - should have 2 docs (only has 2, not 3) + if results[0].InnerHits == nil || results[0].InnerHits["top"] == nil { + t.Fatal("expected inner_hits for doc1") + } + if len(results[0].InnerHits["top"].Hits) != 2 { + t.Errorf("expected 2 inner hits for doc1, got %d", len(results[0].InnerHits["top"].Hits)) + } + + // Result 1: c (doc2, score 15) + if results[1].ID != "c" || results[1].Score != 15 { + t.Errorf("expected second representative 'c' with score 15, got '%s' with score %f", + results[1].ID, results[1].Score) + } + + // Check inner_hits for doc2 - should have 1 doc (only has 1) + if results[1].InnerHits == nil || results[1].InnerHits["top"] == nil { + t.Fatal("expected inner_hits for doc2") + } + if len(results[1].InnerHits["top"].Hits) != 1 { + t.Errorf("expected 1 inner hit for doc2, got %d", len(results[1].InnerHits["top"].Hits)) + } +} + // stubReaderWithCollapse is a stub reader that can return collapse field values type stubReaderWithCollapse struct { stubReader diff --git a/search/search.go b/search/search.go index 46e32fed9..ebab59735 100644 --- a/search/search.go +++ b/search/search.go @@ -138,6 +138,57 @@ type FieldTermLocation struct { type FieldFragmentMap map[string][]string +// InnerHitsResult contains the additional documents from a collapsed group. +// Similar to Elasticsearch's inner_hits response structure. +type InnerHitsResult struct { + Hits DocumentMatchCollection `json:"hits"` + Total uint64 `json:"total"` // Total documents in this group before pagination +} + +// InnerHitsRequest describes how to retrieve additional documents from each collapsed group. +// Similar to Elasticsearch's inner_hits feature. +type InnerHitsRequest struct { + Name string `json:"name"` // Name for this inner_hits section + Size int `json:"size,omitempty"` // Number of documents to return (default: 3) + From int `json:"from,omitempty"` // Offset for pagination within the group + Sort SortOrder `json:"sort,omitempty"` // Independent sort order for inner hits + Fields []string `json:"fields,omitempty"` // Fields to include in inner hit documents + + // Second-level collapse for hierarchical grouping + Collapse *CollapseRequest `json:"collapse,omitempty"` +} + +// NewInnerHitsRequest creates a new inner_hits request with the given name. +func NewInnerHitsRequest(name string) *InnerHitsRequest { + return &InnerHitsRequest{ + Name: name, + Size: 3, // Elasticsearch default + } +} + +// CollapseRequest describes field collapsing for search results. +// Collapses search results based on a field value, returning one representative +// document per unique field value in the main results, with optional inner_hits +// containing additional documents from each group. +// Similar to Elasticsearch's collapse feature with inner_hits. +type CollapseRequest struct { + Field string `json:"field"` + + // InnerHits configurations for retrieving additional documents per group. + // Each inner_hits configuration can have independent sort order, pagination, and field selection. + InnerHits []*InnerHitsRequest `json:"inner_hits,omitempty"` +} + +// NewCollapseRequest creates a new collapse request for the specified field. +// Returns one representative document per unique field value. +// Use InnerHits to retrieve additional documents from each group. +func NewCollapseRequest(field string, innerHits ...*InnerHitsRequest) *CollapseRequest { + return &CollapseRequest{ + Field: field, + InnerHits: innerHits, + } +} + type DocumentMatch struct { Index string `json:"index,omitempty"` ID string `json:"id"` @@ -154,6 +205,11 @@ type DocumentMatch struct { // fields as float64s and date fields as strings. Fields map[string]interface{} `json:"fields,omitempty"` + // InnerHits contains additional documents from the same collapsed group. + // Only populated when collapse with inner_hits is configured. + // Map key is the inner_hits name. + InnerHits map[string]*InnerHitsResult `json:"inner_hits,omitempty"` + // used to maintain natural index order HitNumber uint64 `json:"-"` diff --git a/search_knn.go b/search_knn.go index 02078bbe3..d3eef7b99 100644 --- a/search_knn.go +++ b/search_knn.go @@ -51,7 +51,7 @@ type SearchRequest struct { Score string `json:"score,omitempty"` SearchAfter []string `json:"search_after"` SearchBefore []string `json:"search_before"` - Collapse *CollapseRequest `json:"collapse,omitempty"` + Collapse *search.CollapseRequest `json:"collapse,omitempty"` KNN []*KNNRequest `json:"knn"` KNNOperator knnOperator `json:"knn_operator"` diff --git a/search_no_knn.go b/search_no_knn.go index 0c13d86a3..119ec9252 100644 --- a/search_no_knn.go +++ b/search_no_knn.go @@ -64,7 +64,7 @@ type SearchRequest struct { Score string `json:"score,omitempty"` SearchAfter []string `json:"search_after"` SearchBefore []string `json:"search_before"` - Collapse *CollapseRequest `json:"collapse,omitempty"` + Collapse *search.CollapseRequest `json:"collapse,omitempty"` // PreSearchData will be a map that will be used // in the second phase of any 2-phase search, to provide additional @@ -96,12 +96,12 @@ func (r *SearchRequest) UnmarshalJSON(input []byte) error { Explain bool `json:"explain"` Sort []json.RawMessage `json:"sort"` IncludeLocations bool `json:"includeLocations"` - Score string `json:"score"` - SearchAfter []string `json:"search_after"` - SearchBefore []string `json:"search_before"` - Collapse *CollapseRequest `json:"collapse"` - PreSearchData json.RawMessage `json:"pre_search_data"` - Params json.RawMessage `json:"params"` + Score string `json:"score"` + SearchAfter []string `json:"search_after"` + SearchBefore []string `json:"search_before"` + Collapse *search.CollapseRequest `json:"collapse"` + PreSearchData json.RawMessage `json:"pre_search_data"` + Params json.RawMessage `json:"params"` } err := json.Unmarshal(input, &temp)