diff --git a/src/main/java/com/nvidia/cuvs/lucene/CuVS2510GPUVectorsFormat.java b/src/main/java/com/nvidia/cuvs/lucene/CuVS2510GPUVectorsFormat.java index ccc61ea..f16ea02 100644 --- a/src/main/java/com/nvidia/cuvs/lucene/CuVS2510GPUVectorsFormat.java +++ b/src/main/java/com/nvidia/cuvs/lucene/CuVS2510GPUVectorsFormat.java @@ -7,6 +7,7 @@ import static com.nvidia.cuvs.lucene.ThreadLocalCuVSResourcesProvider.assertIsSupported; import com.nvidia.cuvs.LibraryException; +import com.nvidia.cuvs.spi.CuVSProvider; import java.io.IOException; import org.apache.lucene.codecs.KnnVectorsFormat; import org.apache.lucene.codecs.KnnVectorsReader; @@ -39,6 +40,7 @@ public class CuVS2510GPUVectorsFormat extends KnnVectorsFormat { static { try { + CuVSProvider.provider().enableRMMAsyncMemory(); LUCENE_PROVIDER = LuceneProvider.getInstance("99"); FLAT_VECTORS_FORMAT = LUCENE_PROVIDER.getLuceneFlatVectorsFormatInstance(DefaultFlatVectorScorer.INSTANCE); diff --git a/src/main/java/com/nvidia/cuvs/lucene/CuVS2510GPUVectorsReader.java b/src/main/java/com/nvidia/cuvs/lucene/CuVS2510GPUVectorsReader.java index c783660..caa7fab 100644 --- a/src/main/java/com/nvidia/cuvs/lucene/CuVS2510GPUVectorsReader.java +++ b/src/main/java/com/nvidia/cuvs/lucene/CuVS2510GPUVectorsReader.java @@ -443,6 +443,8 @@ public void search(String field, float[] target, KnnCollector knnCollector, Bits new CagraSearchParams.Builder() .withItopkSize(Math.max(collector.getiTopK(), topK)) .withSearchWidth(collector.getSearchWidth()) + .withThreadBlockSize(collector.getThreadBlockSize()) + .withAlgo(collector.getSearchAlgo()) .build(); } else { // Setting itopK as topK because in any case iTopK should be ATLEAST equal to topK @@ -600,6 +602,37 @@ private static void checkVersion(int versionMeta, int versionVectorData, IndexIn } } + /** + * Maps a local segment ordinal to a Lucene doc ID within this segment. + * + *

Used by {@link GPUKnnFloatVectorQuery} after a multi-segment GPU search to convert + * select_k result ordinals to doc IDs before adding {@code docBase}. + * + * @param field the vector field name + * @param ordinal the local ordinal returned by CAGRA + * @return the Lucene doc ID within this segment + * @throws IOException if the vector values cannot be read + */ + public int ordToDoc(String field, int ordinal) throws IOException { + return flatVectorsReader.getFloatVectorValues(field).ordToDoc(ordinal); + } + + /** + * Returns the {@link CagraIndex} for the given field, or {@code null} if unavailable + * (e.g., during a merge or when the field is missing). + * + * @param field the vector field name + * @return the CAGRA index, or {@code null} + */ + public CagraIndex getCagraIndexForField(String field) { + if (cuvsIndices == null) return null; + FieldInfo info = fieldInfos.fieldInfo(field); + if (info == null) return null; + GPUIndex gpuIndex = cuvsIndices.get(info.number); + if (gpuIndex == null) return null; + return gpuIndex.getCagraIndex(); + } + /** * Gets the instance of FieldInfos. * diff --git a/src/main/java/com/nvidia/cuvs/lucene/FilterCuVSProvider.java b/src/main/java/com/nvidia/cuvs/lucene/FilterCuVSProvider.java index 07e64fb..ac7dbc4 100644 --- a/src/main/java/com/nvidia/cuvs/lucene/FilterCuVSProvider.java +++ b/src/main/java/com/nvidia/cuvs/lucene/FilterCuVSProvider.java @@ -145,6 +145,11 @@ public HnswIndex hnswIndexFromCagra(HnswIndexParams arg0, CagraIndex arg1) throw return delegate.hnswIndexFromCagra(arg0, arg1); } + @Override + public void enableRMMAsyncMemory() { + delegate.enableRMMAsyncMemory(); + } + @Override public void enableRMMManagedPooledMemory(int arg0, int arg1) { delegate.enableRMMManagedPooledMemory(arg0, arg1); diff --git a/src/main/java/com/nvidia/cuvs/lucene/GPUKnnFloatVectorQuery.java b/src/main/java/com/nvidia/cuvs/lucene/GPUKnnFloatVectorQuery.java index 6a9daae..8de69f4 100644 --- a/src/main/java/com/nvidia/cuvs/lucene/GPUKnnFloatVectorQuery.java +++ b/src/main/java/com/nvidia/cuvs/lucene/GPUKnnFloatVectorQuery.java @@ -4,17 +4,58 @@ */ package com.nvidia.cuvs.lucene; +import static com.nvidia.cuvs.lucene.ThreadLocalCuVSResourcesProvider.getCuVSResourcesInstance; + +import com.nvidia.cuvs.CagraIndex; +import com.nvidia.cuvs.CagraQuery; +import com.nvidia.cuvs.CagraSearchParams; +import com.nvidia.cuvs.CuVSMatrix; +import com.nvidia.cuvs.CuVSResources; +import com.nvidia.cuvs.MultiSegmentCagraSearch; +import com.nvidia.cuvs.MultiSegmentSearchResults; import java.io.IOException; -import org.apache.lucene.index.LeafReader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.BitSet; +import java.util.Comparator; +import java.util.List; +import org.apache.lucene.codecs.KnnVectorsReader; +import org.apache.lucene.index.CodecReader; +import org.apache.lucene.index.FilterLeafReader; +import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.Explanation; +import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.KnnFloatVectorQuery; +import org.apache.lucene.search.MatchNoDocsQuery; import org.apache.lucene.search.Query; +import org.apache.lucene.search.QueryVisitor; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Scorer; +import org.apache.lucene.search.ScorerSupplier; import org.apache.lucene.search.TopDocs; +import org.apache.lucene.search.Weight; import org.apache.lucene.search.knn.KnnCollectorManager; import org.apache.lucene.util.Bits; /** - * Extends upon KnnFloatVectorQuery for GPU-only search. + * Extends {@link KnnFloatVectorQuery} for GPU-only search. + * + *

When all index segments use {@link CuVS2510GPUVectorsReader} and the query uses CAGRA + * (k ≤ 1024, no explicit filter), {@link #rewrite} runs a globally-optimized + * multi-segment search: + *

    + *
  1. All per-segment CAGRA searches write into a single shared device buffer without any + * per-segment device-to-host copy or stream synchronization.
  2. + *
  3. A single {@code cuvsSelectK} call finds the global top-k entirely on GPU.
  4. + *
  5. Results are copied to host in one pass and mapped to Lucene doc IDs.
  6. + *
+ * + *

Falls back to the standard per-segment Lucene path when the optimized path cannot be + * applied (mixed segment types, explicit query filter, k > 1024, brute-force + * fallback needed, or missing CAGRA index). * * @since 25.10 */ @@ -22,24 +63,172 @@ public class GPUKnnFloatVectorQuery extends KnnFloatVectorQuery { private final int iTopK; private final int searchWidth; - + private final int threadBlockSize; + private final CagraSearchParams.SearchAlgo searchAlgo; /** - * Initializes {@link GPUKnnFloatVectorQuery} + * Initializes {@link GPUKnnFloatVectorQuery} with {@link CagraSearchParams.SearchAlgo#AUTO}, + * and max_iterations auto-selected (0). * - * @param field the vector field name - * @param target the vector target query - * @param k the topK value - * @param filter instance of the Query - * @param iTopK the iTopK value - * @param searchWidth the search width + * @param field the vector field name + * @param target the query vector + * @param k the number of nearest neighbors to return + * @param filter optional pre-filter query + * @param iTopK CAGRA itopk_size parameter + * @param searchWidth CAGRA search_width parameter */ public GPUKnnFloatVectorQuery( String field, float[] target, int k, Query filter, int iTopK, int searchWidth) { + this(field, target, k, filter, iTopK, searchWidth, 0, CagraSearchParams.SearchAlgo.AUTO); + } + + /** + * Initializes {@link GPUKnnFloatVectorQuery}. + * + * @param field the vector field name + * @param target the query vector + * @param k the number of nearest neighbors to return + * @param filter optional pre-filter query + * @param iTopK CAGRA itopk_size parameter + * @param searchWidth CAGRA search_width parameter + * @param threadBlockSize CAGRA thread_block_size (0 = auto) + * @param searchAlgo CAGRA search algorithm + */ + public GPUKnnFloatVectorQuery( + String field, + float[] target, + int k, + Query filter, + int iTopK, + int searchWidth, + int threadBlockSize, + CagraSearchParams.SearchAlgo searchAlgo) { super(field, target, k, filter); this.iTopK = iTopK; this.searchWidth = searchWidth; + this.threadBlockSize = threadBlockSize; + this.searchAlgo = searchAlgo; } + // ------------------------------------------------------------------------- + // Optimized multi-segment path + // ------------------------------------------------------------------------- + + @Override + public Query rewrite(IndexSearcher indexSearcher) throws IOException { + // Only apply the optimized path when there is no explicit filter. + // Live-document filtering (deletions) is handled via acceptDocs below. + if (filter != null) { + return super.rewrite(indexSearcher); + } + // CAGRA search is capped at k=1024. + if (k > 1024) { + return super.rewrite(indexSearcher); + } + + IndexReader reader = indexSearcher.getIndexReader(); + List leaves = reader.leaves(); + if (leaves.isEmpty()) { + return new MatchNoDocsQuery(); + } + + // Collect a CuVS2510GPUVectorsReader for every segment; fall back if any segment + // lacks one or has no CAGRA index for this field. + List gpuReaders = new ArrayList<>(leaves.size()); + for (LeafReaderContext ctx : leaves) { + CuVS2510GPUVectorsReader gpuReader = unwrapGpuReader(ctx); + if (gpuReader == null || gpuReader.getCagraIndexForField(field) == null) { + return super.rewrite(indexSearcher); + } + gpuReaders.add(gpuReader); + } + + // Compute max_iterations from the largest segment so all segments use the same value, + // ensuring consistent search quality across segments of different sizes. + int maxDatasetSize = 0; + int graphDegree = 0; + for (int i = 0; i < gpuReaders.size(); i++) { + CuVSMatrix graph = gpuReaders.get(i).getCagraIndexForField(field).getGraph(); + int segSize = (int) graph.size(); + if (segSize > maxDatasetSize) { + maxDatasetSize = segSize; + graphDegree = (int) graph.columns(); + } + } + int maxIterations = + computeMaxIterations(Math.max(iTopK, k), searchWidth, maxDatasetSize, graphDegree); + + // Build one CagraIndex + CagraQuery per segment. + CuVSResources resources = getCuVSResourcesInstance(); + List cagraIndices = new ArrayList<>(leaves.size()); + List cagraQueries = new ArrayList<>(leaves.size()); + + try { + float[] target = getTargetCopy(); + CagraSearchParams searchParams = + new CagraSearchParams.Builder() + .withItopkSize(Math.max(iTopK, k)) + .withSearchWidth(searchWidth) + .withMaxIterations(maxIterations) + .withThreadBlockSize(threadBlockSize) + .withAlgo(searchAlgo) + .build(); + + // Upload the query vector to device once and share it across all per-segment CagraQueries. + CuVSMatrix.Builder vectorBuilder = + CuVSMatrix.deviceBuilder(resources, 1, target.length, CuVSMatrix.DataType.FLOAT); + vectorBuilder.addVector(target); + + ScoreDoc[] scoreDocs; + try (CuVSMatrix queryVector = vectorBuilder.build()) { + for (int i = 0; i < leaves.size(); i++) { + LeafReaderContext ctx = leaves.get(i); + cagraIndices.add(gpuReaders.get(i).getCagraIndexForField(field)); + + // Pass live-document bits as the prefilter so deleted docs are excluded. + Bits liveDocs = ctx.reader().getLiveDocs(); + CagraQuery query = + buildCagraQuery( + resources, queryVector, k, searchParams, liveDocs, gpuReaders.get(i), ctx); + cagraQueries.add(query); + } + + MultiSegmentSearchResults results = + MultiSegmentCagraSearch.search(resources, cagraIndices, cagraQueries, k); + + if (results.count() == 0) { + return new MatchNoDocsQuery(); + } + + // Map (segmentIdx, ordinal) → global Lucene doc ID; compute normalized score. + scoreDocs = new ScoreDoc[results.count()]; + for (int j = 0; j < results.count(); j++) { + int segIdx = results.getSegmentIndex(j); + int ordinal = results.getOrdinal(j); + float dist = results.getDistance(j); + + LeafReaderContext ctx = leaves.get(segIdx); + int localDoc = gpuReaders.get(segIdx).ordToDoc(field, ordinal); + int globalDoc = ctx.docBase + localDoc; + float score = 1.0f / (1.0f + dist); + scoreDocs[j] = new ScoreDoc(globalDoc, score); + } + + Arrays.sort(scoreDocs, Comparator.comparingDouble((ScoreDoc sd) -> sd.score).reversed()); + + return docAndScoreQuery(scoreDocs); + } + + } catch (Throwable t) { + if (t instanceof IOException) throw (IOException) t; + if (t instanceof RuntimeException) throw (RuntimeException) t; + throw new RuntimeException("Multi-segment GPU search failed", t); + } + } + + // ------------------------------------------------------------------------- + // Per-segment fallback path (used when filter != null or k > 1024) + // ------------------------------------------------------------------------- + @Override protected TopDocs approximateSearch( LeafReaderContext context, @@ -47,12 +236,220 @@ protected TopDocs approximateSearch( int visitedLimit, KnnCollectorManager knnCollectorManager) throws IOException { - GPUPerLeafCuVSKnnCollector results = - new GPUPerLeafCuVSKnnCollector(k, visitedLimit, iTopK, searchWidth); - - LeafReader reader = context.reader(); - reader.searchNearestVectors(field, this.getTargetCopy(), results, acceptDocs); + new GPUPerLeafCuVSKnnCollector( + k, + visitedLimit, + iTopK, + searchWidth, + threadBlockSize, + searchAlgo); + context.reader().searchNearestVectors(field, getTargetCopy(), results, acceptDocs); return results.topDocs(); } + + // ------------------------------------------------------------------------- + // Helpers + // ------------------------------------------------------------------------- + + /** + * Mirrors {@code adjust_search_params()} from {@code search_plan.cuh}: computes the + * {@code max_iterations} value that CAGRA would auto-select for the given parameters. + * + *

Called with the largest segment's size so that all segments produce the same + * value, ensuring consistent search quality regardless of segment size. + */ + private static int computeMaxIterations( + int itopkSize, int searchWidth, int datasetSize, int graphDegree) { + int maxIter = itopkSize / searchWidth; + long numReachableNodes = 1; + long factor = Math.max(2L, graphDegree / 2); + while (numReachableNodes < datasetSize) { + numReachableNodes *= factor; + maxIter++; + } + return maxIter; + } + + /** + * Unwraps the {@link LeafReaderContext}'s reader to a {@link CuVS2510GPUVectorsReader}, or + * returns {@code null} if the reader is not of that type. + */ + private static CuVS2510GPUVectorsReader unwrapGpuReader(LeafReaderContext ctx) { + var unwrapped = FilterLeafReader.unwrap(ctx.reader()); + if (!(unwrapped instanceof CodecReader)) return null; + KnnVectorsReader vr = ((CodecReader) unwrapped).getVectorReader(); + return (vr instanceof CuVS2510GPUVectorsReader gpuReader) ? gpuReader : null; + } + + /** + * Builds a {@link CagraQuery} for a single segment, incorporating live-document filtering + * via a prefilter bitset when deletions are present. + */ + private CagraQuery buildCagraQuery( + CuVSResources resources, + CuVSMatrix queryVector, + int topK, + CagraSearchParams searchParams, + Bits liveDocs, + CuVS2510GPUVectorsReader gpuReader, + LeafReaderContext ctx) + throws IOException { + CagraQuery.Builder queryBuilder = + new CagraQuery.Builder(resources) + .withTopK(topK) + .withSearchParams(searchParams) + .withQueryVectors(queryVector); + + if (liveDocs != null) { + // Convert liveDocs to a BitSet over vector ordinals so CAGRA can filter on GPU. + var rawValues = gpuReader.getFloatVectorValues(field); + Bits acceptedOrds = rawValues.getAcceptOrds(liveDocs); + int length = acceptedOrds.length(); + BitSet mask = new BitSet(length); + for (int i = 0; i < length; i++) { + if (acceptedOrds.get(i)) mask.set(i); + } + queryBuilder.withPrefilter(mask, length); + } + + return queryBuilder.build(); + } + + /** + * Builds a {@link Query} that matches exactly the given pre-scored documents. + * + *

Partitions {@code scoreDocs} by segment (using {@link ScoreDoc#shardIndex} as the segment + * offset relative to {@link LeafReaderContext#docBase}), then returns a {@link Scorer} per + * segment that iterates those docs in ascending doc-ID order and replays their pre-computed + * scores. + */ + private static Query docAndScoreQuery(ScoreDoc[] scoreDocs) { + return new Query() { + @Override + public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost) + throws IOException { + return new Weight(this) { + @Override + public ScorerSupplier scorerSupplier(LeafReaderContext ctx) { + int base = ctx.docBase; + int maxDoc = base + ctx.reader().maxDoc(); + // Collect docs belonging to this segment; re-sort by local doc ID ascending. + int[] localDocs = new int[scoreDocs.length]; + float[] localScores = new float[scoreDocs.length]; + int count = 0; + for (ScoreDoc sd : scoreDocs) { + if (sd.doc >= base && sd.doc < maxDoc) { + localDocs[count] = sd.doc - base; + localScores[count] = sd.score * boost; + count++; + } + } + if (count == 0) return null; + final int n = count; + Integer[] idx = new Integer[n]; + for (int i = 0; i < n; i++) idx[i] = i; + Arrays.sort(idx, Comparator.comparingInt(i -> localDocs[i])); + final int[] sortedDocs = new int[n]; + final float[] sortedScores = new float[n]; + for (int i = 0; i < n; i++) { + sortedDocs[i] = localDocs[idx[i]]; + sortedScores[i] = localScores[idx[i]]; + } + + return new ScorerSupplier() { + @Override + public Scorer get(long leadCost) { + return new Scorer() { + private int pos = -1; + + @Override + public DocIdSetIterator iterator() { + return new DocIdSetIterator() { + @Override + public int docID() { + return pos < 0 ? -1 : (pos >= n ? NO_MORE_DOCS : sortedDocs[pos]); + } + + @Override + public int nextDoc() { + pos++; + return docID(); + } + + @Override + public int advance(int target) { + while (pos < n && sortedDocs[pos] < target) pos++; + return docID(); + } + + @Override + public long cost() { + return n; + } + }; + } + + @Override + public float getMaxScore(int upTo) { + return Float.MAX_VALUE; + } + + @Override + public float score() { + return sortedScores[pos]; + } + + @Override + public int docID() { + return pos < 0 + ? -1 + : (pos >= n ? DocIdSetIterator.NO_MORE_DOCS : sortedDocs[pos]); + } + }; + } + + @Override + public long cost() { + return n; + } + }; + } + + @Override + public boolean isCacheable(LeafReaderContext ctx) { + return false; + } + + @Override + public Explanation explain(LeafReaderContext ctx, int doc) { + for (ScoreDoc sd : scoreDocs) { + if (sd.doc == ctx.docBase + doc) { + return Explanation.match(sd.score, "GPU multi-segment CAGRA search"); + } + } + return Explanation.noMatch("not a GPU search result"); + } + }; + } + + @Override + public String toString(String field) { + return "GPUDocAndScoreQuery"; + } + + @Override + public void visit(QueryVisitor visitor) {} + + @Override + public boolean equals(Object o) { + return this == o; + } + + @Override + public int hashCode() { + return System.identityHashCode(this); + } + }; + } } diff --git a/src/main/java/com/nvidia/cuvs/lucene/GPUPerLeafCuVSKnnCollector.java b/src/main/java/com/nvidia/cuvs/lucene/GPUPerLeafCuVSKnnCollector.java index 421b2ba..1c6d5ec 100644 --- a/src/main/java/com/nvidia/cuvs/lucene/GPUPerLeafCuVSKnnCollector.java +++ b/src/main/java/com/nvidia/cuvs/lucene/GPUPerLeafCuVSKnnCollector.java @@ -4,6 +4,7 @@ */ package com.nvidia.cuvs.lucene; +import com.nvidia.cuvs.CagraSearchParams; import org.apache.lucene.search.TopKnnCollector; /** @@ -15,6 +16,8 @@ class GPUPerLeafCuVSKnnCollector extends TopKnnCollector { private int iTopK; private int searchWidth; + private int threadBlockSize; + private CagraSearchParams.SearchAlgo searchAlgo; /** * Initializes {@link GPUPerLeafCuVSKnnCollector} @@ -22,11 +25,21 @@ class GPUPerLeafCuVSKnnCollector extends TopKnnCollector { * @param topK the topK value * @param iTopK the iTopK value * @param searchWidth the search width + * @param threadBlockSize CAGRA thread_block_size (0 = auto; controls worker_queue_size) + * @param searchAlgo the CAGRA search algorithm */ - public GPUPerLeafCuVSKnnCollector(int topK, int visitLimit, int iTopK, int searchWidth) { + public GPUPerLeafCuVSKnnCollector( + int topK, + int visitLimit, + int iTopK, + int searchWidth, + int threadBlockSize, + CagraSearchParams.SearchAlgo searchAlgo) { super(topK, visitLimit); this.iTopK = iTopK > topK ? iTopK : topK; this.searchWidth = searchWidth; + this.threadBlockSize = threadBlockSize; + this.searchAlgo = searchAlgo; } public int getiTopK() { @@ -36,4 +49,13 @@ public int getiTopK() { public int getSearchWidth() { return searchWidth; } + + public int getThreadBlockSize() { + return threadBlockSize; + } + + public CagraSearchParams.SearchAlgo getSearchAlgo() { + return searchAlgo; + } + } diff --git a/src/main/java/com/nvidia/cuvs/lucene/ThreadLocalCuVSResourcesProvider.java b/src/main/java/com/nvidia/cuvs/lucene/ThreadLocalCuVSResourcesProvider.java index 9e259e2..69d37a3 100644 --- a/src/main/java/com/nvidia/cuvs/lucene/ThreadLocalCuVSResourcesProvider.java +++ b/src/main/java/com/nvidia/cuvs/lucene/ThreadLocalCuVSResourcesProvider.java @@ -42,9 +42,17 @@ public static void setCuVSResourcesInstance(CuVSResources resources) { cuVSResources.set(resources); } + /** System property controlling the workspace pool size per resources handle (in bytes). */ + public static final String WORKSPACE_POOL_SIZE_PROPERTY = "com.nvidia.cuvs.workspacePoolSize"; + private static CuVSResources cuVSResourcesOrNull() { try { - return CuVSResources.create(); + CuVSResources resources = CuVSResources.create(); + String poolSizeProp = System.getProperty(WORKSPACE_POOL_SIZE_PROPERTY); + if (poolSizeProp != null) { + resources.setWorkspacePool(Long.parseLong(poolSizeProp)); + } + return resources; } catch (UnsupportedOperationException uoe) { log.log( Level.WARNING,