diff --git a/cpp/bench/ann/src/cuvs/cuvs_ann_bench_param_parser.h b/cpp/bench/ann/src/cuvs/cuvs_ann_bench_param_parser.h index 2eaf3123a0..4037a451ae 100644 --- a/cpp/bench/ann/src/cuvs/cuvs_ann_bench_param_parser.h +++ b/cpp/bench/ann/src/cuvs/cuvs_ann_bench_param_parser.h @@ -373,6 +373,12 @@ void parse_build_param(const nlohmann::json& conf, ace_params.ef_construction = ace_conf.at("ef_construction"); } if (ace_conf.contains("use_disk")) { ace_params.use_disk = ace_conf.at("use_disk"); } + if (ace_conf.contains("max_host_memory_gb")) { + ace_params.max_host_memory_gb = ace_conf.at("max_host_memory_gb"); + } + if (ace_conf.contains("max_gpu_memory_gb")) { + ace_params.max_gpu_memory_gb = ace_conf.at("max_gpu_memory_gb"); + } cagra_params.graph_build_params = ace_params; } ::parse_build_param(conf, cagra_params); diff --git a/cpp/bench/ann/src/cuvs/cuvs_cagra_hnswlib.cu b/cpp/bench/ann/src/cuvs/cuvs_cagra_hnswlib.cu index 26028b6d98..2a881f7820 100644 --- a/cpp/bench/ann/src/cuvs/cuvs_cagra_hnswlib.cu +++ b/cpp/bench/ann/src/cuvs/cuvs_cagra_hnswlib.cu @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2023-2025, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2023-2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ @@ -20,6 +20,9 @@ auto parse_build_param(const nlohmann::json& conf) -> typename cuvs::bench::cuvs_cagra_hnswlib::build_param param; auto& hnsw_params = param.hnsw_index_params; auto& cagra_params = param.cagra_build_params; + if (conf.contains("use_original_id_graph")) { + param.use_original_id_graph = conf.at("use_original_id_graph"); + } if (conf.contains("hierarchy")) { if (conf.at("hierarchy") == "none") { hnsw_params.hierarchy = cuvs::neighbors::hnsw::HnswHierarchy::NONE; @@ -57,7 +60,8 @@ auto parse_build_param(const nlohmann::json& conf) -> ps.metric = dist_type; // Parse ACE parameters if provided if (conf.contains("npartitions") || conf.contains("build_dir") || - conf.contains("ef_construction") || conf.contains("use_disk")) { + conf.contains("ef_construction") || conf.contains("use_disk") || + conf.contains("max_host_memory_gb") || conf.contains("max_gpu_memory_gb")) { auto ace_params = cuvs::neighbors::cagra::graph_build_params::ace_params(); if (conf.contains("npartitions")) { ace_params.npartitions = conf.at("npartitions"); } if (conf.contains("build_dir")) { ace_params.build_dir = conf.at("build_dir"); } @@ -65,6 +69,12 @@ auto parse_build_param(const nlohmann::json& conf) -> ace_params.ef_construction = conf.at("ef_construction"); } if (conf.contains("use_disk")) { ace_params.use_disk = conf.at("use_disk"); } + if (conf.contains("max_host_memory_gb")) { + ace_params.max_host_memory_gb = conf.at("max_host_memory_gb"); + } + if (conf.contains("max_gpu_memory_gb")) { + ace_params.max_gpu_memory_gb = conf.at("max_gpu_memory_gb"); + } ps.graph_build_params = ace_params; } // NB: above, we only provide the defaults. Below we parse the explicit parameters as usual. diff --git a/cpp/bench/ann/src/cuvs/cuvs_cagra_hnswlib_wrapper.h b/cpp/bench/ann/src/cuvs/cuvs_cagra_hnswlib_wrapper.h index 2f0c54e1bd..0d3dbd70f3 100644 --- a/cpp/bench/ann/src/cuvs/cuvs_cagra_hnswlib_wrapper.h +++ b/cpp/bench/ann/src/cuvs/cuvs_cagra_hnswlib_wrapper.h @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2023-2025, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2023-2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ #pragma once @@ -22,6 +22,7 @@ class cuvs_cagra_hnswlib : public algo, public algo_gpu { using cagra_wrapper_params = typename cuvs_cagra::build_param; cagra_wrapper_params cagra_build_params; cuvs::neighbors::hnsw::index_params hnsw_index_params; + bool use_original_id_graph = false; }; struct search_param : public search_param_base { @@ -102,9 +103,20 @@ void cuvs_cagra_hnswlib::build(const T* dataset, size_t nrow) cagra_wrapper.build(dataset, nrow); auto& cagra_index = *cagra_wrapper.get_index(); - // pass the dataset directly to HNSW if it's on the host + const bool is_ace_disk_build = cagra_index.dataset_fd().has_value() && + cagra_index.graph_fd().has_value() && + cagra_index.mapping_fd().has_value(); + std::optional> opt_dataset_view = std::nullopt; - if (dataset_is_on_host) { + // Regular CAGRA build: pass the dataset directly to HNSW if it's on the host + if (dataset_is_on_host && !is_ace_disk_build) { + opt_dataset_view.emplace( + raft::make_host_matrix_view(dataset, nrow, this->dim_)); + } + // ACE disk build with original id graph: pass the dataset directly to HNSW to remap the graph to + // original ids + if (is_ace_disk_build && build_param_.use_original_id_graph) { + RAFT_EXPECTS(dataset_is_on_host, "Dataset must be on host for original id graph remapping."); opt_dataset_view.emplace( raft::make_host_matrix_view(dataset, nrow, this->dim_)); } diff --git a/cpp/include/cuvs/neighbors/hnsw.hpp b/cpp/include/cuvs/neighbors/hnsw.hpp index 7ee91f18ba..b33f17767a 100644 --- a/cpp/include/cuvs/neighbors/hnsw.hpp +++ b/cpp/include/cuvs/neighbors/hnsw.hpp @@ -452,7 +452,8 @@ std::unique_ptr> build( * @param[in] res raft resources * @param[in] params hnsw index parameters * @param[in] cagra_index cagra index - * @param[in] dataset optional dataset to avoid extra memory copy when hierarchy is `CPU` + * @param[in] dataset optional dataset in the original row order. When provided for a disk-backed + * ACE index, cuVS remaps the on-disk ACE graph back to original ids before exporting. * * Usage example: * @code{.cpp} @@ -488,7 +489,8 @@ std::unique_ptr> from_cagra( * @param[in] res raft resources * @param[in] params hnsw index parameters * @param[in] cagra_index cagra index - * @param[in] dataset optional dataset to avoid extra memory copy when hierarchy is `CPU` + * @param[in] dataset optional dataset in the original row order. When provided for a disk-backed + * ACE index, cuVS remaps the on-disk ACE graph back to original ids before exporting. * * Usage example: * @code{.cpp} @@ -524,7 +526,8 @@ std::unique_ptr> from_cagra( * @param[in] res raft resources * @param[in] params hnsw index parameters * @param[in] cagra_index cagra index - * @param[in] dataset optional dataset to avoid extra memory copy when hierarchy is `CPU` + * @param[in] dataset optional dataset in the original row order. When provided for a disk-backed + * ACE index, cuVS remaps the on-disk ACE graph back to original ids before exporting. * * Usage example: * @code{.cpp} @@ -560,7 +563,8 @@ std::unique_ptr> from_cagra( * @param[in] res raft resources * @param[in] params hnsw index parameters * @param[in] cagra_index cagra index - * @param[in] dataset optional dataset to avoid extra memory copy when hierarchy is `CPU` + * @param[in] dataset optional dataset in the original row order. When provided for a disk-backed + * ACE index, cuVS remaps the on-disk ACE graph back to original ids before exporting. * * Usage example: * @code{.cpp} diff --git a/cpp/src/neighbors/detail/hnsw.hpp b/cpp/src/neighbors/detail/hnsw.hpp index 4914a0fa1b..6b28e1dbf9 100644 --- a/cpp/src/neighbors/detail/hnsw.hpp +++ b/cpp/src/neighbors/detail/hnsw.hpp @@ -23,6 +23,7 @@ #include #include +#include #include #include #include @@ -820,6 +821,495 @@ void serialize_to_hnswlib_from_disk(raft::resources const& res, RAFT_LOG_INFO("HNSW serialization from disk complete in %ld ms", elapsed_time); } +template +std::pair remap_disk_graph_to_original_ids( + const cuvs::neighbors::cagra::index& index_, const std::string& output_path) +{ + auto total_start = std::chrono::high_resolution_clock::now(); + const auto& graph_fd_opt = index_.graph_fd(); + const auto& mapping_fd_opt = index_.mapping_fd(); + + RAFT_EXPECTS(graph_fd_opt.has_value() && graph_fd_opt->is_valid(), + "Graph file descriptor is not available"); + RAFT_EXPECTS(mapping_fd_opt.has_value() && mapping_fd_opt->is_valid(), + "Mapping file descriptor is not available"); + + const auto graph_path = graph_fd_opt->get_path(); + const auto mapping_path = mapping_fd_opt->get_path(); + RAFT_EXPECTS(!graph_path.empty(), "Unable to get path from graph file descriptor"); + RAFT_EXPECTS(!mapping_path.empty(), "Unable to get path from mapping file descriptor"); + + std::ifstream graph_stream(graph_path, std::ios::binary); + RAFT_EXPECTS(graph_stream.good(), "Failed to open graph file: %s", graph_path.c_str()); + auto graph_header = raft::detail::numpy_serializer::read_header(graph_stream); + auto graph_header_size = static_cast(graph_stream.tellg()); + RAFT_EXPECTS(graph_header.shape.size() == 2, + "Graph file should be 2D, got %zu dimensions", + graph_header.shape.size()); + + std::ifstream mapping_stream(mapping_path, std::ios::binary); + RAFT_EXPECTS(mapping_stream.good(), "Failed to open mapping file: %s", mapping_path.c_str()); + auto mapping_header = raft::detail::numpy_serializer::read_header(mapping_stream); + auto mapping_header_size = static_cast(mapping_stream.tellg()); + RAFT_EXPECTS(mapping_header.shape.size() == 1, + "Mapping file should be 1D, got %zu dimensions", + mapping_header.shape.size()); + + const auto n_rows = graph_header.shape[0]; + const auto graph_degree = graph_header.shape[1]; + RAFT_EXPECTS(mapping_header.shape[0] == n_rows, + "Mapping size (%zu) must match graph rows (%zu)", + mapping_header.shape[0], + n_rows); + + auto mapping = raft::make_host_vector(n_rows); + cuvs::util::read_large_file( + *mapping_fd_opt, mapping.data_handle(), n_rows * sizeof(IdxT), mapping_header_size); + const auto* mapping_data = mapping.data_handle(); + + auto reordered_rows = raft::make_host_vector(n_rows); + auto* reordered_rows_data = reordered_rows.data_handle(); +#pragma omp parallel for + for (int64_t reordered_row = 0; reordered_row < static_cast(n_rows); ++reordered_row) { + reordered_rows_data[mapping_data[reordered_row]] = static_cast(reordered_row); + } + + auto [output_fd, output_header_size] = + cuvs::util::create_numpy_file(output_path, {n_rows, graph_degree}); + + // Target batch size and coalescing thresholds. Larger batches require more RAM. + // Tune these values if you see performance issues. + const size_t target_batch_bytes = 64 * 1024 * 1024; + const size_t max_coalesced_gap_bytes = 128 * 1024; + const size_t max_coalesced_span_bytes = 8 * 1024 * 1024; + const size_t row_bytes = graph_degree * sizeof(IdxT); + const size_t batch_size = std::max(1, target_batch_bytes / row_bytes); + const size_t max_coalesced_gap_rows = std::max(1, max_coalesced_gap_bytes / row_bytes); + const size_t max_coalesced_span_rows = std::max(1, max_coalesced_span_bytes / row_bytes); + + // Limit the number of read threads. Larger values require more RAM. 4-8 should be able to + // saturate most NVMe disks. Tune this value if you see I/O performance issues. + const int read_parallelism = std::min(cuvs::core::omp::get_max_threads(), 8); + auto output_batch = + raft::make_host_matrix(batch_size, static_cast(graph_degree)); + auto span_buffer = raft::make_host_vector(static_cast(read_parallelism) * + max_coalesced_span_rows * graph_degree); + std::vector read_order(batch_size); + auto* output_batch_data = output_batch.data_handle(); + auto* span_buffer_data = span_buffer.data_handle(); + + // Bucket sort read_order by coarse disk region if buckets are not too sparse. + const size_t bucket_size_rows = max_coalesced_span_rows; + const size_t num_buckets = (n_rows + bucket_size_rows - 1) / bucket_size_rows; + const bool use_bucket_sort = num_buckets > 0 && batch_size / num_buckets >= 4; + std::vector bucket_offsets(use_bucket_sort ? num_buckets + 1 : 0); + std::vector bucket_scatter(use_bucket_sort ? num_buckets : 0); + + struct span_t { + size_t sorted_begin; + size_t sorted_end; + size_t first_reordered_row; + size_t span_rows; + }; + std::vector spans; + spans.reserve(batch_size); + + size_t total_read_bytes = 0; + size_t total_write_bytes = 0; + size_t total_span_count = 0; + + RAFT_LOG_INFO( + "HNSW remap: n_rows=%zu degree=%zu batch_size=%zu max_gap=%zu rows max_span=%zu rows " + "read_threads=%d", + n_rows, + graph_degree, + batch_size, + max_coalesced_gap_rows, + max_coalesced_span_rows, + read_parallelism); + + for (size_t original_batch_start = 0; original_batch_start < n_rows; + original_batch_start += batch_size) { + const auto current_batch_size = std::min(batch_size, n_rows - original_batch_start); + + const auto key_less = [&](size_t lhs, size_t rhs) { + return reordered_rows_data[original_batch_start + lhs] < + reordered_rows_data[original_batch_start + rhs]; + }; + if (use_bucket_sort) { + std::fill(bucket_offsets.begin(), bucket_offsets.end(), 0); + for (size_t k = 0; k < current_batch_size; ++k) { + const auto rr = reordered_rows_data[original_batch_start + k]; + bucket_offsets[rr / bucket_size_rows + 1]++; + } + for (size_t b = 1; b <= num_buckets; ++b) { + bucket_offsets[b] += bucket_offsets[b - 1]; + } + std::copy( + bucket_offsets.begin(), bucket_offsets.begin() + num_buckets, bucket_scatter.begin()); + for (size_t k = 0; k < current_batch_size; ++k) { + const auto rr = reordered_rows_data[original_batch_start + k]; + read_order[bucket_scatter[rr / bucket_size_rows]++] = k; + } + for (size_t b = 0; b < num_buckets; ++b) { + const auto lo = bucket_offsets[b]; + const auto hi = bucket_offsets[b + 1]; + if (hi - lo > 1) { std::sort(read_order.begin() + lo, read_order.begin() + hi, key_less); } + } + } else { + for (size_t k = 0; k < current_batch_size; ++k) { + read_order[k] = k; + } + std::sort(read_order.begin(), read_order.begin() + current_batch_size, key_less); + } + + // Precompute coalesced spans so reads and remap can run in parallel across spans. + spans.clear(); + for (size_t sorted_pos = 0; sorted_pos < current_batch_size;) { + const auto first_reordered_row = + static_cast(reordered_rows_data[original_batch_start + read_order[sorted_pos]]); + size_t span_end_pos = sorted_pos + 1; + size_t last_reordered_row = first_reordered_row; + while (span_end_pos < current_batch_size) { + const auto next_reordered_row = + static_cast(reordered_rows_data[original_batch_start + read_order[span_end_pos]]); + const auto gap_rows = next_reordered_row - last_reordered_row; + const auto span_rows = next_reordered_row - first_reordered_row + 1; + if (gap_rows > max_coalesced_gap_rows || span_rows > max_coalesced_span_rows) { break; } + last_reordered_row = next_reordered_row; + ++span_end_pos; + } + spans.push_back({sorted_pos, + span_end_pos, + first_reordered_row, + last_reordered_row - first_reordered_row + 1}); + sorted_pos = span_end_pos; + } + + size_t batch_read_bytes = 0; +#pragma omp parallel for num_threads(read_parallelism) reduction(+ : batch_read_bytes) + for (int64_t span_idx = 0; span_idx < static_cast(spans.size()); ++span_idx) { + const auto& s = spans[span_idx]; + const int tid = cuvs::core::omp::get_thread_num(); + IdxT* tid_buffer = span_buffer_data + tid * max_coalesced_span_rows * graph_degree; + const auto input_offset = + static_cast(graph_header_size + s.first_reordered_row * row_bytes); + const auto bytes_to_read = s.span_rows * row_bytes; + cuvs::util::read_large_file(*graph_fd_opt, tid_buffer, bytes_to_read, input_offset); + batch_read_bytes += bytes_to_read; + + for (size_t pos = s.sorted_begin; pos < s.sorted_end; ++pos) { + const auto batch_idx = read_order[pos]; + const auto original_row = original_batch_start + batch_idx; + const auto reordered_row = static_cast(reordered_rows_data[original_row]); + const auto local_row = reordered_row - s.first_reordered_row; + const auto* graph_row_ptr = tid_buffer + local_row * graph_degree; + auto* output_row_ptr = output_batch_data + batch_idx * graph_degree; + for (size_t neighbor_idx = 0; neighbor_idx < graph_degree; ++neighbor_idx) { + output_row_ptr[neighbor_idx] = mapping_data[graph_row_ptr[neighbor_idx]]; + } + } + } + + const auto output_offset = + static_cast(output_header_size + original_batch_start * row_bytes); + const auto batch_write_bytes = current_batch_size * row_bytes; + cuvs::util::write_large_file(output_fd, output_batch_data, batch_write_bytes, output_offset); + + total_read_bytes += batch_read_bytes; + total_write_bytes += batch_write_bytes; + total_span_count += spans.size(); + } + + const auto total_elapsed = std::chrono::duration_cast( + std::chrono::high_resolution_clock::now() - total_start) + .count(); + const double gib = static_cast(1 << 30); + RAFT_LOG_INFO( + "HNSW remap: completed in %ld ms: read %.2f GiB across %zu spans (%.2fx amplification), " + "wrote %.2f GiB", + total_elapsed, + total_read_bytes / gib, + total_span_count, + total_write_bytes > 0 ? static_cast(total_read_bytes) / total_write_bytes : 0.0, + total_write_bytes / gib); + + return {std::move(output_fd), output_header_size}; +} + +template +void serialize_to_hnswlib_with_original_dataset( + raft::resources const& res, + std::ostream& os_raw, + const cuvs::neighbors::hnsw::index_params& params, + const cuvs::neighbors::cagra::index& index_, + raft::host_matrix_view dataset) +{ + raft::common::nvtx::range fun_scope( + "cagra::serialize_original_order"); + + auto start_time = std::chrono::system_clock::now(); + cuvs::util::buffered_ofstream os(&os_raw, 1 << 20 /*1MB*/); + + RAFT_EXPECTS(index_.graph_fd().has_value() && index_.mapping_fd().has_value(), + "Function only implements serialization from disk-backed ACE graph."); + RAFT_EXPECTS(params.hierarchy != HnswHierarchy::CPU, + "Disk2disk serialization not supported for CPU hierarchy."); + RAFT_EXPECTS(static_cast(dataset.extent(0)) == static_cast(index_.size()), + "Dataset rows (%zu) must match index size (%zu)", + static_cast(dataset.extent(0)), + static_cast(index_.size())); + RAFT_EXPECTS(static_cast(dataset.extent(1)) == static_cast(index_.dim()), + "Dataset cols (%zu) must match index dimensions (%zu)", + static_cast(dataset.extent(1)), + static_cast(index_.dim())); + + const auto& graph_fd = index_.graph_fd(); + std::string graph_path = graph_fd->get_path(); + RAFT_EXPECTS(!graph_path.empty(), "Unable to get path from graph file descriptor"); + const auto index_directory = std::filesystem::path(graph_path).parent_path().string(); + const auto remapped_graph_path = + (std::filesystem::path(index_directory) / "cagra_graph_original_ids.npy").string(); + + auto [remapped_graph_fd, graph_header_size] = + remap_disk_graph_to_original_ids(index_, remapped_graph_path); + + auto n_rows = index_.size(); + auto dim = index_.dim(); + auto graph_degree_int = static_cast(index_.graph_degree()); + + RAFT_LOG_INFO( + "Saving CAGRA index to hnswlib format with original dataset order, size %zu, dim %zu, " + "graph_degree %zu", + static_cast(n_rows), + static_cast(dim), + static_cast(graph_degree_int)); + + // Size the per-batch graph read buffer around a 64 MiB read target. + const size_t target_batch_bytes = 64 * 1024 * 1024; + const size_t batch_size = + std::max(1, target_batch_bytes / (graph_degree_int * sizeof(IdxT))); + + auto graph_buffer = raft::make_host_matrix(batch_size, graph_degree_int); + + auto hnsw_index = std::make_unique>(dim, index_.metric(), params.hierarchy); + + int odd_graph_degree = graph_degree_int % 2; + auto appr_algo = std::make_unique::type>>( + hnsw_index->get_space(), 1, (graph_degree_int + 1) / 2, params.ef_construction); + + bool create_hierarchy = params.hierarchy != HnswHierarchy::NONE; + + std::vector hist; + std::vector order(n_rows); + std::vector order_bw(n_rows); + std::vector levels(n_rows); + std::vector offsets; + + if (create_hierarchy) { + RAFT_LOG_INFO("Sort points by levels"); + for (int64_t i = 0; i < n_rows; i++) { + auto pt_level = appr_algo->getRandomLevel(appr_algo->mult_); + while (pt_level >= static_cast(hist.size())) + hist.push_back(0); + hist[pt_level]++; + levels[i] = pt_level; + } + + offsets.resize(hist.size() + 1, 0); + for (size_t i = 0; i < hist.size() - 1; i++) { + offsets[i + 1] = offsets[i] + hist[i]; + RAFT_LOG_INFO("Level %zu : %zu", i + 1, size_t(n_rows) - offsets[i + 1]); + } + + for (int64_t i = 0; i < n_rows; i++) { + auto pt_level = levels[i]; + order_bw[i] = offsets[pt_level]; + order[offsets[pt_level]++] = i; + } + } + + appr_algo->enterpoint_node_ = create_hierarchy ? order.back() : n_rows / 2; + appr_algo->maxlevel_ = create_hierarchy ? hist.size() - 1 : 1; + + os.write(reinterpret_cast(&appr_algo->offsetLevel0_), sizeof(std::size_t)); + size_t num_elements = static_cast(n_rows); + os.write(reinterpret_cast(&num_elements), sizeof(std::size_t)); + os.write(reinterpret_cast(&num_elements), sizeof(std::size_t)); + os.write(reinterpret_cast(&appr_algo->size_data_per_element_), sizeof(std::size_t)); + os.write(reinterpret_cast(&appr_algo->label_offset_), sizeof(std::size_t)); + os.write(reinterpret_cast(&appr_algo->offsetData_), sizeof(std::size_t)); + os.write(reinterpret_cast(&appr_algo->maxlevel_), sizeof(int)); + os.write(reinterpret_cast(&appr_algo->enterpoint_node_), sizeof(int)); + os.write(reinterpret_cast(&appr_algo->maxM_), sizeof(std::size_t)); + os.write(reinterpret_cast(&appr_algo->maxM0_), sizeof(std::size_t)); + os.write(reinterpret_cast(&appr_algo->M_), sizeof(std::size_t)); + os.write(reinterpret_cast(&appr_algo->mult_), sizeof(double)); + os.write(reinterpret_cast(&appr_algo->ef_construction_), sizeof(std::size_t)); + + auto host_query_set = + raft::make_host_matrix(create_hierarchy ? n_rows - hist[0] : 0, dim); + + int64_t d_report_offset = n_rows / 10; + int64_t next_report_offset = d_report_offset; + auto start_clock = std::chrono::system_clock::now(); + + RAFT_EXPECTS(appr_algo->size_data_per_element_ == + dim * sizeof(T) + appr_algo->maxM0_ * sizeof(IdxT) + sizeof(int) + sizeof(size_t), + "Size data per element mismatch"); + + RAFT_LOG_INFO("Writing base level"); + size_t bytes_written = 0; + float GiB = 1 << 30; + IdxT zero = 0; + + for (int64_t batch_start = 0; batch_start < n_rows; batch_start += batch_size) { + const int64_t current_batch_size = std::min(batch_size, n_rows - batch_start); + + const size_t graph_bytes = current_batch_size * graph_degree_int * sizeof(IdxT); + const off_t graph_offset = graph_header_size + batch_start * graph_degree_int * sizeof(IdxT); + auto bytes_read = + pread(remapped_graph_fd.get(), graph_buffer.data_handle(), graph_bytes, graph_offset); + RAFT_EXPECTS(bytes_read == static_cast(graph_bytes), + "Failed to read remapped graph data: expected %zu, got %zd", + graph_bytes, + bytes_read); + + for (int64_t batch_idx = 0; batch_idx < current_batch_size; batch_idx++) { + const int64_t i = batch_start + batch_idx; + + os.write(reinterpret_cast(&graph_degree_int), sizeof(int)); + + const IdxT* graph_row = &graph_buffer(batch_idx, 0); + os.write(reinterpret_cast(graph_row), sizeof(IdxT) * graph_degree_int); + + if (odd_graph_degree) { + RAFT_EXPECTS(odd_graph_degree == static_cast(appr_algo->maxM0_) - graph_degree_int, + "Odd graph degree mismatch"); + os.write(reinterpret_cast(&zero), sizeof(IdxT)); + } + + const T* data_row = &dataset(batch_start + batch_idx, 0); + os.write(reinterpret_cast(data_row), sizeof(T) * dim); + + if (create_hierarchy && levels[i] > 0) { + std::copy(data_row, data_row + dim, &host_query_set(order_bw[i] - hist[0], 0)); + } + + auto label = static_cast(i); + os.write(reinterpret_cast(&label), sizeof(std::size_t)); + + bytes_written += appr_algo->size_data_per_element_; + + const auto end_clock = std::chrono::system_clock::now(); + if (i > next_report_offset) { + next_report_offset += d_report_offset; + const auto time = + std::chrono::duration_cast(end_clock - start_clock).count() * + 1e-6; + float throughput = bytes_written / GiB / time; + float rows_throughput = i / time; + float ETA = (n_rows - i) / rows_throughput; + RAFT_LOG_INFO( + "# Writing rows %12lu / %12lu (%3.2f %%), %3.2f GiB/sec, ETA %d:%3.1f, written %3.2f " + "GiB\r", + i, + n_rows, + i / static_cast(n_rows) * 100, + throughput, + int(ETA / 60), + std::fmod(ETA, 60.0f), + bytes_written / GiB); + } + } + } + + RAFT_LOG_INFO("Writing upper layers"); + std::vector>> host_neighbors; + host_neighbors.resize(hist.size()); + for (size_t pt_level = 1; create_hierarchy && pt_level < hist.size(); pt_level++) { + common::nvtx::range level_scope("level %zu", pt_level); + auto start_idx = offsets[pt_level - 1]; + auto end_idx = offsets[hist.size() - 1]; + auto num_pts = end_idx - start_idx; + auto neighbor_size = num_pts > appr_algo->M_ ? appr_algo->M_ : num_pts - 1; + if (num_pts <= 1) { + host_neighbors[pt_level - 1] = std::nullopt; + continue; + } + + auto view = raft::make_host_matrix_view( + &host_query_set(start_idx - hist[0], 0), num_pts, dim); + host_neighbors[pt_level - 1].emplace( + raft::make_host_matrix(num_pts, neighbor_size)); + all_neighbors_graph(res, view, host_neighbors[pt_level - 1]->view(), index_.metric()); + } + + next_report_offset = d_report_offset; + for (int64_t i = 0; i < n_rows; i++) { + size_t cur_level = create_hierarchy ? levels[i] : 0; + unsigned int linkListSize = + create_hierarchy && cur_level > 0 ? appr_algo->size_links_per_element_ * cur_level : 0; + os.write(reinterpret_cast(&linkListSize), sizeof(int)); + bytes_written += sizeof(int); + if (linkListSize) { + for (size_t pt_level = 1; pt_level <= cur_level; pt_level++) { + unsigned int extent = 0; + if (host_neighbors[pt_level - 1].has_value()) { + auto neighbor_view = host_neighbors[pt_level - 1]->view(); + auto my_row = order_bw[i] - offsets[pt_level - 1]; + IdxT* neighbors = &neighbor_view(my_row, 0); + extent = neighbor_view.extent(1); + os.write(reinterpret_cast(&extent), sizeof(int)); + for (unsigned int j = 0; j < extent; j++) { + const IdxT converted = order[neighbors[j] + offsets[pt_level - 1]]; + os.write(reinterpret_cast(&converted), sizeof(IdxT)); + } + } else { + os.write(reinterpret_cast(&extent), sizeof(int)); + } + auto remainder = appr_algo->M_ - extent; + for (size_t j = 0; j < remainder; j++) { + os.write(reinterpret_cast(&zero), sizeof(IdxT)); + } + bytes_written += (extent + remainder) * sizeof(IdxT) + sizeof(int); + RAFT_EXPECTS( + appr_algo->size_links_per_element_ == (extent + remainder) * sizeof(IdxT) + sizeof(int), + "Size links per element mismatch"); + } + } + + const auto end_clock = std::chrono::system_clock::now(); + if (i > next_report_offset) { + next_report_offset += d_report_offset; + const auto time = + std::chrono::duration_cast(end_clock - start_clock).count() * + 1e-6; + float throughput = bytes_written / GiB / time; + float rows_throughput = i / time; + float ETA = (n_rows - i) / rows_throughput; + RAFT_LOG_INFO( + "# Writing rows %12lu / %12lu (%3.2f %%), %3.2f GiB/sec, ETA %d:%3.1f, written %3.2f GiB\r", + i, + n_rows, + i / static_cast(n_rows) * 100, + throughput, + int(ETA / 60), + std::fmod(ETA, 60.0f), + bytes_written / GiB); + } + } + + os.flush(); + os_raw.flush(); + if (!os_raw.good()) { RAFT_LOG_WARN("Output stream is not in good state after serialization"); } + + auto end_time = std::chrono::system_clock::now(); + auto elapsed_time = + std::chrono::duration_cast(end_time - start_time).count(); + RAFT_LOG_INFO("HNSW serialization with original dataset order complete in %ld ms", elapsed_time); +} + template std::enable_if_t>> from_cagra( raft::resources const& res, @@ -1095,7 +1585,11 @@ std::unique_ptr> from_cagra( RAFT_EXPECTS(of, "Cannot open file %s", index_filename.c_str()); - serialize_to_hnswlib_from_disk(res, of, params, cagra_index); + if (dataset.has_value()) { + serialize_to_hnswlib_with_original_dataset(res, of, params, cagra_index, dataset.value()); + } else { + serialize_to_hnswlib_from_disk(res, of, params, cagra_index); + } of.close(); RAFT_EXPECTS(of, "Error writing output %s", index_filename.c_str()); @@ -1312,8 +1806,10 @@ std::unique_ptr> build(raft::resources const& res, RAFT_LOG_INFO("hnsw::build - Converting CAGRA index to HNSW format"); - // Convert CAGRA index to HNSW index - return from_cagra(res, params, cagra_index, dataset); + // Convert CAGRA index to HNSW index. The resulting HNSW index uses the partitioned ACE index + // order. See `cagra::build` and `hnsw::from_cagra` for more details on how to remap the graph to + // original ids. + return from_cagra(res, params, cagra_index, std::nullopt); } } // namespace cuvs::neighbors::hnsw::detail diff --git a/cpp/tests/neighbors/ann_hnsw_ace.cuh b/cpp/tests/neighbors/ann_hnsw_ace.cuh index 78ae54d9fc..1d1b5b2dbe 100644 --- a/cpp/tests/neighbors/ann_hnsw_ace.cuh +++ b/cpp/tests/neighbors/ann_hnsw_ace.cuh @@ -7,9 +7,14 @@ #include "ann_cagra.cuh" #include +#include + +#include #include +#include #include +#include namespace cuvs::neighbors::hnsw { @@ -57,6 +62,123 @@ class AnnHnswAceTest : public ::testing::TestWithParam { } protected: + auto load_npy_1d(const std::string& path) -> std::vector + { + cuvs::util::file_descriptor fd(path, O_RDONLY); + std::ifstream stream(path, std::ios::binary); + EXPECT_TRUE(stream.is_open()); + if (!stream.is_open()) { return {}; } + auto header = raft::detail::numpy_serializer::read_header(stream); + auto offset = stream.tellg(); + EXPECT_NE(offset, std::streampos(-1)); + if (offset == std::streampos(-1)) { return {}; } + EXPECT_EQ(header.shape.size(), 1); + if (header.shape.size() != 1) { return {}; } + std::vector data(header.shape[0]); + cuvs::util::read_large_file( + fd, data.data(), data.size() * sizeof(uint32_t), static_cast(offset)); + return data; + } + + auto load_npy_2d(const std::string& path) -> std::pair, std::vector> + { + cuvs::util::file_descriptor fd(path, O_RDONLY); + std::ifstream stream(path, std::ios::binary); + EXPECT_TRUE(stream.is_open()); + if (!stream.is_open()) { return {{}, {}}; } + auto header = raft::detail::numpy_serializer::read_header(stream); + auto offset = stream.tellg(); + EXPECT_NE(offset, std::streampos(-1)); + if (offset == std::streampos(-1)) { return {{}, {}}; } + EXPECT_EQ(header.shape.size(), 2); + if (header.shape.size() != 2) { return {{}, {}}; } + std::vector shape{header.shape[0], header.shape[1]}; + std::vector data(shape[0] * shape[1]); + cuvs::util::read_large_file( + fd, data.data(), data.size() * sizeof(uint32_t), static_cast(offset)); + return {std::move(data), std::move(shape)}; + } + + template + auto load_npy_2d_typed(const std::string& path) -> std::pair, std::vector> + { + cuvs::util::file_descriptor fd(path, O_RDONLY); + std::ifstream stream(path, std::ios::binary); + EXPECT_TRUE(stream.is_open()); + if (!stream.is_open()) { return {{}, {}}; } + auto header = raft::detail::numpy_serializer::read_header(stream); + auto offset = stream.tellg(); + EXPECT_NE(offset, std::streampos(-1)); + if (offset == std::streampos(-1)) { return {{}, {}}; } + EXPECT_EQ(header.shape.size(), 2); + if (header.shape.size() != 2) { return {{}, {}}; } + std::vector shape{header.shape[0], header.shape[1]}; + std::vector data(shape[0] * shape[1]); + cuvs::util::read_large_file( + fd, data.data(), data.size() * sizeof(T), static_cast(offset)); + return {std::move(data), std::move(shape)}; + } + + template + void verifyDiskArtifacts(const std::string& temp_dir, MatrixView original_dataset) + { + const auto mapping_path = temp_dir + "/dataset_mapping.npy"; + const auto reordered_data_path = temp_dir + "/reordered_dataset.npy"; + const auto reordered_graph_path = temp_dir + "/cagra_graph.npy"; + + ASSERT_TRUE(std::filesystem::exists(mapping_path)); + ASSERT_TRUE(std::filesystem::exists(reordered_data_path)); + ASSERT_TRUE(std::filesystem::exists(reordered_graph_path)); + + auto mapping = load_npy_1d(mapping_path); + auto [reordered_dataset, reordered_dataset_shape] = + load_npy_2d_typed(reordered_data_path); + auto [reordered_graph, reordered_shape] = load_npy_2d(reordered_graph_path); + + ASSERT_EQ(reordered_dataset_shape.size(), 2); + ASSERT_EQ(reordered_dataset_shape[0], mapping.size()); + ASSERT_EQ(reordered_dataset_shape[0], static_cast(original_dataset.extent(0))); + ASSERT_EQ(reordered_dataset_shape[1], static_cast(original_dataset.extent(1))); + + const auto dim = reordered_dataset_shape[1]; + std::vector seen_original_rows(mapping.size(), 0); + for (size_t reordered_row = 0; reordered_row < mapping.size(); ++reordered_row) { + const auto original_row = mapping[reordered_row]; + ASSERT_LT(static_cast(original_row), static_cast(original_dataset.extent(0))); + ASSERT_EQ(seen_original_rows[original_row], 0) + << "Mapping is not a permutation; original_row=" << original_row + << " appears more than once"; + seen_original_rows[original_row] = 1; + + const auto* reordered_row_ptr = reordered_dataset.data() + reordered_row * dim; + const auto* original_row_ptr = + original_dataset.data_handle() + static_cast(original_row) * dim; + ASSERT_EQ(std::memcmp(reordered_row_ptr, original_row_ptr, dim * sizeof(DataT)), 0) + << "Reordered dataset mismatch at reordered_row=" << reordered_row + << ", original_row=" << original_row; + } + for (size_t original_row = 0; original_row < seen_original_rows.size(); ++original_row) { + ASSERT_EQ(seen_original_rows[original_row], 1) + << "Mapping is not onto; original_row=" << original_row << " was never referenced"; + } + + ASSERT_EQ(reordered_shape.size(), 2); + ASSERT_EQ(reordered_shape[0], mapping.size()); + + const auto n_rows = reordered_shape[0]; + const auto graph_degree = reordered_shape[1]; + + for (size_t reordered_row = 0; reordered_row < n_rows; ++reordered_row) { + for (size_t neighbor_idx = 0; neighbor_idx < graph_degree; ++neighbor_idx) { + const auto reordered_neighbor = + reordered_graph[reordered_row * graph_degree + neighbor_idx]; + ASSERT_LT(static_cast(reordered_neighbor), mapping.size()) + << "Reordered graph neighbor out of range at reordered_row=" << reordered_row + << ", neighbor_idx=" << neighbor_idx; + } + } + } + void testHnswAceBuild() { size_t queries_size = ps.n_queries * ps.k; @@ -117,6 +239,7 @@ class AnnHnswAceTest : public ::testing::TestWithParam { hnsw::build(handle_, hnsw_params, raft::make_const_mdspan(database_host.view())); ASSERT_NE(hnsw_index, nullptr); + if (ps.use_disk) { verifyDiskArtifacts(temp_dir, database_host.view()); } // Prepare queries on host auto queries_host = raft::make_host_matrix(ps.n_queries, ps.dim); @@ -253,12 +376,188 @@ class AnnHnswAceTest : public ::testing::TestWithParam { << "Graph file should exist when memory limit triggers disk mode fallback"; EXPECT_TRUE(std::filesystem::exists(reordered_file)) << "Reordered dataset file should exist when memory limit triggers disk mode fallback"; + verifyDiskArtifacts(temp_dir, database_host.view()); } // Clean up temporary directory std::filesystem::remove_all(temp_dir); } + // Verifies `hnsw::from_cagra` using a disk-backed ACE index with remapped graph. + void testHnswAceFromCagraRemap() + { + // This test only exercises the disk-backed ACE path. + ASSERT_TRUE(ps.use_disk) << "testHnswAceFromCagraRemap requires use_disk=true"; + + size_t queries_size = ps.n_queries * ps.k; + std::vector indexes_naive(queries_size); + std::vector distances_naive(queries_size); + { + rmm::device_uvector distances_naive_dev(queries_size, stream_); + rmm::device_uvector indexes_naive_dev(queries_size, stream_); + + cuvs::neighbors::naive_knn(handle_, + distances_naive_dev.data(), + indexes_naive_dev.data(), + search_queries.data(), + database_dev.data(), + ps.n_queries, + ps.n_rows, + ps.dim, + ps.k, + ps.metric); + raft::update_host(distances_naive.data(), distances_naive_dev.data(), queries_size, stream_); + raft::update_host(indexes_naive.data(), indexes_naive_dev.data(), queries_size, stream_); + raft::resource::sync_stream(handle_); + } + + std::string temp_dir = std::string("/tmp/cuvs_hnsw_ace_fromcagra_test_") + + std::to_string(std::time(nullptr)) + "_" + + std::to_string(reinterpret_cast(this)); + std::filesystem::create_directories(temp_dir); + + auto database_host = raft::make_host_matrix(ps.n_rows, ps.dim); + raft::copy(database_host.data_handle(), database_dev.data(), ps.n_rows * ps.dim, stream_); + raft::resource::sync_stream(handle_); + + cuvs::neighbors::cagra::index_params cagra_params; + cagra_params.metric = ps.metric; + cagra_params.intermediate_graph_degree = 128; + cagra_params.graph_degree = 64; + + auto cagra_ace_params = graph_build_params::ace_params(); + cagra_ace_params.npartitions = ps.npartitions; + cagra_ace_params.ef_construction = ps.ef_construction; + cagra_ace_params.build_dir = temp_dir; + cagra_ace_params.use_disk = true; + cagra_ace_params.max_host_memory_gb = ps.max_host_memory_gb; + cagra_ace_params.max_gpu_memory_gb = ps.max_gpu_memory_gb; + cagra_params.graph_build_params = cagra_ace_params; + + auto cagra_index = cuvs::neighbors::cagra::build( + handle_, cagra_params, raft::make_const_mdspan(database_host.view())); + raft::resource::sync_stream(handle_); + + // The CAGRA ACE build writes the reordered artifacts unconditionally for now. + const auto mapping_path = temp_dir + "/dataset_mapping.npy"; + const auto reordered_graph_path = temp_dir + "/cagra_graph.npy"; + ASSERT_TRUE(std::filesystem::exists(mapping_path)); + ASSERT_TRUE(std::filesystem::exists(reordered_graph_path)); + + auto mapping = load_npy_1d(mapping_path); + auto [reordered_graph, reordered_shape] = load_npy_2d(reordered_graph_path); + ASSERT_EQ(mapping.size(), static_cast(ps.n_rows)); + ASSERT_EQ(reordered_shape.size(), 2); + ASSERT_EQ(reordered_shape[0], mapping.size()); + const auto graph_degree = reordered_shape[1]; + + // HNSW params shared across both `from_cagra` calls. + hnsw::index_params hnsw_params; + hnsw_params.metric = ps.metric; + hnsw_params.hierarchy = hnsw::HnswHierarchy::GPU; + hnsw_params.M = 32; + hnsw_params.ef_construction = ps.ef_construction; + + const auto hnsw_bin_path = temp_dir + "/hnsw_index.bin"; + const auto reordered_bin_path = temp_dir + "/hnsw_index_reordered.bin"; + const auto original_graph_path = temp_dir + "/cagra_graph_original_ids.npy"; + + // Path 1: from_cagra with reordered graph + { + auto hnsw_index_reordered = hnsw::from_cagra(handle_, hnsw_params, cagra_index); + ASSERT_NE(hnsw_index_reordered, nullptr); + } + ASSERT_TRUE(std::filesystem::exists(hnsw_bin_path)) + << "hnsw::from_cagra should write hnsw_index.bin next to the disk-backed CAGRA graph"; + EXPECT_FALSE(std::filesystem::exists(original_graph_path)) + << "cagra_graph_original_ids.npy must NOT be produced on the reordered from_cagra path"; + + // Move the reordered HNSW binary aside so the remap path can overwrite + // hnsw_index.bin without clobbering the first variant. + std::filesystem::rename(hnsw_bin_path, reordered_bin_path); + + // Path 2: from_cagra with remapped graph + { + auto hnsw_index_remapped = hnsw::from_cagra( + handle_, hnsw_params, cagra_index, raft::make_const_mdspan(database_host.view())); + ASSERT_NE(hnsw_index_remapped, nullptr); + } + ASSERT_TRUE(std::filesystem::exists(hnsw_bin_path)) + << "hnsw::from_cagra(dataset) should write hnsw_index.bin next to the CAGRA graph"; + ASSERT_TRUE(std::filesystem::exists(original_graph_path)) + << "cagra_graph_original_ids.npy must be produced when the dataset is passed to from_cagra"; + + auto [original_graph, original_shape] = load_npy_2d(original_graph_path); + ASSERT_EQ(original_shape.size(), 2); + ASSERT_EQ(original_shape[0], reordered_shape[0]); + ASSERT_EQ(original_shape[1], reordered_shape[1]); + + // Invariant: for every reordered row i and neighbor slot j, + // original_graph[mapping[i], j] == mapping[reordered_graph[i, j]]. + for (size_t reordered_row = 0; reordered_row < mapping.size(); ++reordered_row) { + const auto original_row = mapping[reordered_row]; + ASSERT_LT(static_cast(original_row), mapping.size()); + for (size_t j = 0; j < graph_degree; ++j) { + const auto reordered_neighbor = reordered_graph[reordered_row * graph_degree + j]; + ASSERT_LT(static_cast(reordered_neighbor), mapping.size()); + const auto expected_original_neighbor = mapping[reordered_neighbor]; + const auto actual_original_neighbor = + original_graph[static_cast(original_row) * graph_degree + j]; + ASSERT_EQ(actual_original_neighbor, expected_original_neighbor) + << "Remapped graph mismatch at reordered_row=" << reordered_row + << ", original_row=" << original_row << ", neighbor_idx=" << j; + } + } + + // Both deserialized HNSW indices return original-id labels + auto queries_host = raft::make_host_matrix(ps.n_queries, ps.dim); + raft::copy(queries_host.data_handle(), search_queries.data(), ps.n_queries * ps.dim, stream_); + raft::resource::sync_stream(handle_); + + hnsw::search_params search_params; + search_params.ef = std::max(ps.ef_construction, ps.k * 2); + search_params.num_threads = 1; + + auto check_recall = [&](const std::string& bin_path, const char* label) { + hnsw::index* loaded = nullptr; + hnsw::deserialize(handle_, hnsw_params, bin_path, ps.dim, ps.metric, &loaded); + ASSERT_NE(loaded, nullptr) << label << ": failed to deserialize HNSW index from " << bin_path; + + auto indexes_hnsw_host = raft::make_host_matrix(ps.n_queries, ps.k); + auto distances_hnsw_host = raft::make_host_matrix(ps.n_queries, ps.k); + hnsw::search(handle_, + search_params, + *loaded, + queries_host.view(), + indexes_hnsw_host.view(), + distances_hnsw_host.view()); + + std::vector indexes_hnsw_converted(queries_size); + std::vector distances_hnsw(queries_size); + for (size_t i = 0; i < queries_size; ++i) { + indexes_hnsw_converted[i] = static_cast(indexes_hnsw_host.data_handle()[i]); + distances_hnsw[i] = distances_hnsw_host.data_handle()[i]; + } + + EXPECT_TRUE(cuvs::neighbors::eval_neighbours(indexes_naive, + indexes_hnsw_converted, + distances_naive, + distances_hnsw, + ps.n_queries, + ps.k, + 0.003, + ps.min_recall)) + << label << " HNSW from_cagra search failed recall check"; + + delete loaded; + }; + + check_recall(reordered_bin_path, "reordered"); + check_recall(hnsw_bin_path, "remapped"); + + std::filesystem::remove_all(temp_dir); + } + void SetUp() override { database_dev.resize(((size_t)ps.n_rows) * ps.dim, stream_); @@ -307,24 +606,45 @@ inline std::vector generate_hnsw_ace_inputs() // Inputs specifically for testing memory limit fallback to disk mode inline std::vector generate_hnsw_ace_memory_fallback_inputs() { - return { - // Test with L2 metric - {10, // n_queries - 5000, // n_rows - 64, // dim - 10, // k - 2, // npartitions - 100, // ef_construction - false, // use_disk (not explicitly set, should be triggered by memory limit) - cuvs::distance::DistanceType::L2Expanded, - 0.0, // min_recall (not checked in fallback test) - 0.001, // max_host_memory_gb (tiny limit to force disk mode) - 0.001} // max_gpu_memory_gb (tiny limit to force disk mode) - }; + return { // Test with L2 metric + {10, // n_queries + 5000, // n_rows + 64, // dim + 10, // k + 2, // npartitions + 100, // ef_construction + false, // use_disk (not explicitly set, should be triggered by memory limit) + cuvs::distance::DistanceType::L2Expanded, + 0.0, // min_recall (not checked in fallback test) + 0.001, // max_host_memory_gb (tiny limit to force disk mode) + 0.001}}; // max_gpu_memory_gb (tiny limit to force disk mode) +} + +// Inputs specifically for the `hnsw::from_cagra` remap test. This test always +// runs against a disk-backed CAGRA index and calls `from_cagra` both with and +// without the original dataset, so `use_disk` must be true. +inline std::vector generate_hnsw_ace_from_cagra_remap_inputs() +{ + return raft::util::itertools::product( + {10}, // n_queries + {5000}, // n_rows + {64}, // dim + {10}, // k + {2}, // npartitions + {100}, // ef_construction + {true}, // use_disk (remap path requires disk mode) + {cuvs::distance::DistanceType::L2Expanded, + cuvs::distance::DistanceType::InnerProduct}, // metric + {0.7}, // min_recall + {0.0}, // max_host_memory_gb + {0.0} // max_gpu_memory_gb + ); } const std::vector hnsw_ace_inputs = generate_hnsw_ace_inputs(); const std::vector hnsw_ace_memory_fallback_inputs = generate_hnsw_ace_memory_fallback_inputs(); +const std::vector hnsw_ace_from_cagra_remap_inputs = + generate_hnsw_ace_from_cagra_remap_inputs(); } // namespace cuvs::neighbors::hnsw diff --git a/cpp/tests/neighbors/ann_hnsw_ace/test_float_uint32_t.cu b/cpp/tests/neighbors/ann_hnsw_ace/test_float_uint32_t.cu index 57b16d74e9..b730dfd52f 100644 --- a/cpp/tests/neighbors/ann_hnsw_ace/test_float_uint32_t.cu +++ b/cpp/tests/neighbors/ann_hnsw_ace/test_float_uint32_t.cu @@ -23,4 +23,15 @@ INSTANTIATE_TEST_CASE_P(AnnHnswAceMemoryFallbackTest, AnnHnswAceMemoryFallbackTest_float, ::testing::ValuesIn(hnsw_ace_memory_fallback_inputs)); +// Test for `hnsw::from_cagra` remap behavior +typedef AnnHnswAceTest AnnHnswAceFromCagraRemapTest_float; +TEST_P(AnnHnswAceFromCagraRemapTest_float, AnnHnswAceFromCagraRemap) +{ + this->testHnswAceFromCagraRemap(); +} + +INSTANTIATE_TEST_CASE_P(AnnHnswAceFromCagraRemapTest, + AnnHnswAceFromCagraRemapTest_float, + ::testing::ValuesIn(hnsw_ace_from_cagra_remap_inputs)); + } // namespace cuvs::neighbors::hnsw diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt index 11c581f767..d41b3a7e46 100644 --- a/examples/cpp/CMakeLists.txt +++ b/examples/cpp/CMakeLists.txt @@ -32,6 +32,7 @@ include(../cmake/thirdparty/get_cuvs.cmake) add_executable(BRUTE_FORCE_EXAMPLE src/brute_force_bitmap.cu) add_executable(CAGRA_EXAMPLE src/cagra_example.cu) add_executable(CAGRA_HNSW_ACE_EXAMPLE src/cagra_hnsw_ace_example.cu) +add_executable(HNSW_ACE_REORDER_EXAMPLE src/hnsw_ace_reorder_example.cu) add_executable(CAGRA_PERSISTENT_EXAMPLE src/cagra_persistent_example.cu) add_executable(DYNAMIC_BATCHING_EXAMPLE src/dynamic_batching_example.cu) add_executable(HNSW_ACE_EXAMPLE src/hnsw_ace_example.cu) @@ -45,6 +46,9 @@ add_executable(SCANN_EXAMPLE src/scann_example.cu) target_link_libraries(BRUTE_FORCE_EXAMPLE PRIVATE cuvs::cuvs $) target_link_libraries(CAGRA_EXAMPLE PRIVATE cuvs::cuvs $) target_link_libraries(CAGRA_HNSW_ACE_EXAMPLE PRIVATE cuvs::cuvs $) +target_link_libraries( + HNSW_ACE_REORDER_EXAMPLE PRIVATE cuvs::cuvs $ +) target_link_libraries( CAGRA_PERSISTENT_EXAMPLE PRIVATE cuvs::cuvs $ Threads::Threads ) diff --git a/examples/cpp/src/hnsw_ace_reorder_example.cu b/examples/cpp/src/hnsw_ace_reorder_example.cu new file mode 100644 index 0000000000..4e5f727346 --- /dev/null +++ b/examples/cpp/src/hnsw_ace_reorder_example.cu @@ -0,0 +1,235 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. + * SPDX-License-Identifier: Apache-2.0 + */ + +// 1. Optionally quantize the dataset and queries to int8. +// 2. Build a CAGRA index using ACE. Store the HNSW layers on disk. +// 3. Build a HNSW index from the layers on disk and search. + +// On-disk layout written into `params.index_dir`: +// +// index_dir/ +// manifest.json +// cagra_graph_original_ids.npy # base layer [N, graph_degree] +// levels.npy # uint8 [N] +// layer_1_points.npy # uint32 [n_1] +// layer_1_graph.npy # uint32 [n_1, M] +// layer_1_degree.npy # uint32 [n_1] +// layer_2_{points,graph,degree}.npy +// ... + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include + +#include + +#include "common.cuh" + +// When 1, scalar-quantize the float dataset to int8. +#define HNSW_ACE_REORDER_USE_QUANTIZATION 1 + +namespace { + +// Single directory holding both ACE scratch artifacts and the layered HNSW +// output. Matches the pattern in `hnsw_ace_example.cu`. +constexpr const char* kBuildDir = "/tmp/hnsw_ace_reorder"; + +} // namespace + +template +struct quantized_pair { + raft::host_matrix dataset; + raft::host_matrix queries; +}; + +quantized_pair quantize_dataset(raft::device_resources const& dev_resources, + raft::host_matrix_view dataset_float, + raft::host_matrix_view queries_float) +{ + std::cout << " quantize_dataset: training scalar quantizer (float -> int8)" << std::endl; + cuvs::preprocessing::quantize::scalar::params qp; + auto quantizer = cuvs::preprocessing::quantize::scalar::train(dev_resources, qp, dataset_float); + + auto dataset_i8 = + raft::make_host_matrix(dataset_float.extent(0), dataset_float.extent(1)); + cuvs::preprocessing::quantize::scalar::transform( + dev_resources, quantizer, dataset_float, dataset_i8.view()); + + auto queries_i8 = + raft::make_host_matrix(queries_float.extent(0), queries_float.extent(1)); + cuvs::preprocessing::quantize::scalar::transform( + dev_resources, quantizer, queries_float, queries_i8.view()); + + std::cout << " quantize_dataset: produced int8 dataset [" << dataset_i8.extent(0) << ", " + << dataset_i8.extent(1) << "] and queries [" << queries_i8.extent(0) << ", " + << queries_i8.extent(1) << "]" << std::endl; + return {std::move(dataset_i8), std::move(queries_i8)}; +} + +template +void hnsw_build_to_disk(raft::device_resources const& dev_resources, + raft::host_matrix_view dataset, + const std::string& build_dir) +{ + using namespace cuvs::neighbors; + + // HNSW index parameters -- identical in spirit to hnsw_ace_example.cu. + hnsw::index_params hnsw_params; + hnsw_params.metric = cuvs::distance::DistanceType::L2Expanded; + hnsw_params.hierarchy = hnsw::HnswHierarchy::GPU; + // graph_degree = 2 * M; intermediate_graph_degree = 3 * M. Higher M helps + // higher-dimensional or high-recall targets at the cost of memory. + hnsw_params.M = 32; + // ef_construction controls the candidate list size during hierarchy + // linking; larger values improve recall with diminishing returns. + hnsw_params.ef_construction = 120; + + // Parameters for the GPU-accelerated graph builder underneath HNSW. + auto ace_params = hnsw::graph_build_params::ace_params(); + // Number of ACE partitions. Small values can improve recall at the cost + // of perf/memory. Watch for imbalance (up to ~3x in practice). + ace_params.npartitions = 4; + // Disk-mode ACE writes intermediate artifacts (reordered dataset, dataset + // mapping, ACE-internal graph) under build_dir. `build_to_disk` reuses + // this same directory to land the final layered HNSW artifacts, so the + // whole pipeline produces a single self-describing output directory. + ace_params.use_disk = true; + ace_params.build_dir = build_dir; + hnsw_params.graph_build_params = ace_params; + + std::cout << " hnsw_build_to_disk: writing layered HNSW index to " << build_dir << std::endl; + hnsw::build_to_disk(dev_resources, hnsw_params, dataset); + std::cout << " hnsw_build_to_disk: done (see " << build_dir << "/manifest.json)" << std::endl; +} + +template +void hnsw_search_from_disk(raft::device_resources const& dev_resources, + raft::host_matrix_view dataset, + raft::host_matrix_view queries, + const std::string& build_dir, + int64_t topk = 12) +{ + using namespace cuvs::neighbors; + + std::cout << " hnsw_search_from_disk: loading layered index from " << build_dir << std::endl; + // Metric, M, num_layers, entry point, etc. are read from + // `build_dir/manifest.json`; the dataset argument is what the loaded + // index is wired against for search-time distance computation. + auto hnsw_index = hnsw::load_from_disk(dev_resources, build_dir, dataset); + + const int64_t n_queries = queries.extent(0); + auto indices_hnsw_host = raft::make_host_matrix(n_queries, topk); + auto distances_hnsw_host = raft::make_host_matrix(n_queries, topk); + + hnsw::search_params search_params; + search_params.ef = std::max(200, static_cast(topk) * 2); + search_params.num_threads = 1; + + std::cout << " hnsw_search_from_disk: running HNSW search (top-" << topk << ")" << std::endl; + hnsw::search(dev_resources, + search_params, + *hnsw_index, + queries, + indices_hnsw_host.view(), + distances_hnsw_host.view()); + + // Narrow u64 -> u32 for the shared `print_results` helper. + auto neighbors = raft::make_device_matrix(dev_resources, n_queries, topk); + auto distances = raft::make_device_matrix(dev_resources, n_queries, topk); + auto neighbors_host = raft::make_host_matrix(n_queries, topk); + for (int64_t i = 0; i < n_queries; ++i) { + for (int64_t j = 0; j < topk; ++j) { + neighbors_host(i, j) = static_cast(indices_hnsw_host(i, j)); + } + } + raft::copy(neighbors.data_handle(), + neighbors_host.data_handle(), + n_queries * topk, + raft::resource::get_cuda_stream(dev_resources)); + raft::copy(distances.data_handle(), + distances_hnsw_host.data_handle(), + n_queries * topk, + raft::resource::get_cuda_stream(dev_resources)); + raft::resource::sync_stream(dev_resources); + + print_results(dev_resources, neighbors.view(), distances.view()); +} + +int main() +{ + raft::device_resources dev_resources; + + // Set pool memory resource with 1 GiB initial pool size. All allocations use the same pool. + rmm::mr::pool_memory_resource pool_mr(rmm::mr::get_current_device_resource_ref(), + 1024 * 1024 * 1024ull); + rmm::mr::set_current_device_resource(pool_mr); + + // Alternatively, one could define a pool allocator for temporary arrays (used within RAFT + // algorithms). In that case only the internal arrays would use the pool, any other allocation + // uses the default RMM memory resource. Here is how to change the workspace memory resource to + // a pool with 2 GiB upper limit. + // raft::resource::set_workspace_to_pool_resource(dev_resources, 2 * 1024 * 1024 * 1024ull); + +#if HNSW_ACE_REORDER_USE_QUANTIZATION + std::cout << "[stage 1] Generate and quantize dataset (float -> int8)" << std::endl; +#else + std::cout << "[stage 1] Generate dataset (float)" << std::endl; +#endif + + // ACE requires host-side data, so mirror the generated dataset and queries + // onto the host. + auto dataset_host = raft::make_host_matrix(n_samples, n_dim); + raft::copy(dataset_host.data_handle(), + dataset.data_handle(), + dataset.extent(0) * dataset.extent(1), + raft::resource::get_cuda_stream(dev_resources)); + auto queries_host = raft::make_host_matrix(n_queries, n_dim); + raft::copy(queries_host.data_handle(), + queries.data_handle(), + queries.extent(0) * queries.extent(1), + raft::resource::get_cuda_stream(dev_resources)); + raft::resource::sync_stream(dev_resources); + + auto dataset_host_view = raft::make_host_matrix_view( + dataset_host.data_handle(), n_samples, n_dim); + auto queries_host_view = raft::make_host_matrix_view( + queries_host.data_handle(), n_queries, n_dim); + + std::filesystem::create_directories(kBuildDir); + +#if HNSW_ACE_REORDER_USE_QUANTIZATION + auto q = quantize_dataset(dev_resources, dataset_host_view, queries_host_view); + + auto dataset_i8_view = raft::make_host_matrix_view( + q.dataset.data_handle(), n_samples, n_dim); + auto queries_i8_view = raft::make_host_matrix_view( + q.queries.data_handle(), n_queries, n_dim); + + std::cout << "[stage 2] Build HNSW layers and store on disk" << std::endl; + hnsw_build_to_disk(dev_resources, dataset_i8_view, kBuildDir); + + std::cout << "[stage 3] Build HNSW index from layers on disk and search" << std::endl; + hnsw_search_from_disk(dev_resources, dataset_i8_view, queries_i8_view, kBuildDir); +#else + std::cout << "[stage 2] Build HNSW layers and store on disk" << std::endl; + hnsw_build_to_disk(dev_resources, dataset_host_view, kBuildDir); + + std::cout << "[stage 3] Build HNSW index from layers on disk and search" << std::endl; + hnsw_search_from_disk(dev_resources, dataset_host_view, queries_host_view, kBuildDir); +#endif + + return 0; +} diff --git a/python/cuvs/cuvs/neighbors/hnsw/hnsw.pxd b/python/cuvs/cuvs/neighbors/hnsw/hnsw.pxd index 9ffb295ad3..15ef066e69 100644 --- a/python/cuvs/cuvs/neighbors/hnsw/hnsw.pxd +++ b/python/cuvs/cuvs/neighbors/hnsw/hnsw.pxd @@ -72,6 +72,13 @@ cdef extern from "cuvs/neighbors/hnsw.h" nogil: cuvsCagraIndex_t cagra_index, cuvsHnswIndex_t hnsw_index) except + + cuvsError_t cuvsHnswFromCagraWithDataset( + cuvsResources_t res, + cuvsHnswIndexParams_t params, + cuvsCagraIndex_t cagra_index, + cuvsHnswIndex_t hnsw_index, + DLManagedTensor* dataset_tensor) except + + cuvsError_t cuvsHnswBuild(cuvsResources_t res, cuvsHnswIndexParams_t params, DLManagedTensor* dataset, diff --git a/python/cuvs/cuvs/neighbors/hnsw/hnsw.pyx b/python/cuvs/cuvs/neighbors/hnsw/hnsw.pyx index 6757695de3..c0b67f7ab0 100644 --- a/python/cuvs/cuvs/neighbors/hnsw/hnsw.pyx +++ b/python/cuvs/cuvs/neighbors/hnsw/hnsw.pyx @@ -408,7 +408,7 @@ def load(IndexParams index_params, filename, dim, dtype, metric="sqeuclidean", @auto_sync_resources def from_cagra(IndexParams index_params, cagra.Index cagra_index, - temporary_index_path=None, resources=None): + temporary_index_path=None, dataset=None, resources=None): """ Returns an HNSW index from a CAGRA index. @@ -436,6 +436,17 @@ def from_cagra(IndexParams index_params, cagra.Index cagra_index, temporary_index_path : string, default = None Path to save the temporary index file. If None, the temporary file will be saved in `/tmp/.bin`. + dataset : Host array interface compliant matrix, default = None (optional) + The original dataset used to build the CAGRA index, shape + ``(n_samples, dim)`` and dtype in + ``[float32, float16, int8, uint8]``. When provided for a disk-backed + CAGRA index produced by ACE, the HNSW index will be built against the + original row id space. The ACE-reordered graph is remapped back to + the original ids and the remapped graph is written to disk alongside + the other ACE artifacts as ``cagra_graph_original_ids.npy``. When + ``None`` (the default), the disk-backed HNSW index keeps the + ACE-reordered row ids and callers must apply ``dataset_mapping.npy`` + to translate search results back to the original id space. {resources_docstring} Examples @@ -457,12 +468,29 @@ def from_cagra(IndexParams index_params, cagra.Index cagra_index, cdef Index hnsw_index = Index() cdef cuvsResources_t res = resources.get_c_obj() - check_cuvs(cuvsHnswFromCagra( - res, - index_params.params, - cagra_index.index, - hnsw_index.index - )) + cdef cydlpack.DLManagedTensor* dataset_dlpack = NULL + + if dataset is None: + check_cuvs(cuvsHnswFromCagra( + res, + index_params.params, + cagra_index.index, + hnsw_index.index + )) + else: + dataset_ai = wrap_array(dataset) + _check_input_array(dataset_ai, [np.dtype('float32'), + np.dtype('float16'), + np.dtype('uint8'), + np.dtype('int8')]) + dataset_dlpack = cydlpack.dlpack_c(dataset_ai) + check_cuvs(cuvsHnswFromCagraWithDataset( + res, + index_params.params, + cagra_index.index, + hnsw_index.index, + dataset_dlpack + )) hnsw_index.trained = True return hnsw_index diff --git a/python/cuvs/cuvs/tests/test_hnsw_ace.py b/python/cuvs/cuvs/tests/test_hnsw_ace.py index 44138c883a..9344095f2f 100644 --- a/python/cuvs/cuvs/tests/test_hnsw_ace.py +++ b/python/cuvs/cuvs/tests/test_hnsw_ace.py @@ -10,7 +10,7 @@ from sklearn.neighbors import NearestNeighbors from sklearn.preprocessing import normalize -from cuvs.neighbors import hnsw +from cuvs.neighbors import cagra, hnsw from cuvs.tests.ann_utils import calc_recall, generate_data @@ -267,3 +267,114 @@ def test_hnsw_ace_tiny_memory_limit_triggers_disk_mode(): assert os.path.exists(reordered_file), ( "Reordered dataset file should exist when disk mode is triggered" ) + + +def test_hnsw_ace_from_cagra_remaps_graph_to_original_ids(): + """Verify `hnsw.from_cagra` for a disk-backed ACE index including remapping.""" + n_rows = 2048 + n_cols = 32 + n_queries = 64 + k = 10 + dtype = np.float32 + metric = "sqeuclidean" + + dataset = generate_data((n_rows, n_cols), dtype) + queries = generate_data((n_queries, n_cols), dtype) + + with tempfile.TemporaryDirectory() as temp_dir: + cagra_ace_params = cagra.AceParams( + npartitions=4, + ef_construction=120, + build_dir=temp_dir, + use_disk=True, + ) + cagra_build_params = cagra.IndexParams( + metric=metric, + intermediate_graph_degree=128, + graph_degree=64, + build_algo="ace", + ace_params=cagra_ace_params, + ) + + cagra_index = cagra.build(cagra_build_params, dataset) + assert cagra_index.trained + + mapping = np.load(os.path.join(temp_dir, "dataset_mapping.npy")) + reordered_dataset = np.load( + os.path.join(temp_dir, "reordered_dataset.npy") + ) + reordered_graph = np.load(os.path.join(temp_dir, "cagra_graph.npy")) + + # The mapping must be a permutation of [0, n_rows), and the + # reordered dataset must be consistent with it: each reordered row + # is the original row at position mapping[i]. + assert mapping.shape == (n_rows,) + assert np.array_equal(np.sort(mapping), np.arange(n_rows)) + np.testing.assert_array_equal(reordered_dataset, dataset[mapping]) + + # Ground truth in the original id space. + nn_skl = NearestNeighbors( + n_neighbors=k, algorithm="brute", metric="sqeuclidean" + ) + nn_skl.fit(dataset) + skl_idx = nn_skl.kneighbors(queries, return_distance=False) + + hnsw_params = hnsw.IndexParams(hierarchy="gpu", metric=metric) + hnsw_bin_path = os.path.join(temp_dir, "hnsw_index.bin") + original_graph_path = os.path.join( + temp_dir, "cagra_graph_original_ids.npy" + ) + + # Path 1: from_cagra with reordered graph + hnsw.from_cagra(hnsw_params, cagra_index) + assert os.path.exists(hnsw_bin_path) + assert not os.path.exists(original_graph_path), ( + "cagra_graph_original_ids.npy must not be produced on the " + "reordered from_cagra path" + ) + + reordered_bin_path = os.path.join(temp_dir, "hnsw_index_reordered.bin") + os.replace(hnsw_bin_path, reordered_bin_path) + + # Path 2: from_cagra with remapped graph + hnsw.from_cagra(hnsw_params, cagra_index, dataset=dataset) + assert os.path.exists(hnsw_bin_path) + assert os.path.exists(original_graph_path), ( + "cagra_graph_original_ids.npy must be produced when the " + "original dataset is passed to from_cagra" + ) + + original_graph = np.load(original_graph_path) + + # Each row must be at the original row position and every neighbor + # id must be an original id. + assert original_graph.shape == reordered_graph.shape + expected_original_graph = np.empty_like(reordered_graph) + expected_original_graph[mapping] = mapping[reordered_graph] + np.testing.assert_array_equal(original_graph, expected_original_graph) + assert original_graph.min() >= 0 + assert original_graph.max() < n_rows + + search_params = hnsw.SearchParams(ef=200, num_threads=1) + + original_hnsw_index = hnsw.load( + hnsw_params, hnsw_bin_path, n_cols, dtype, metric=metric + ) + _, out_idx_original = hnsw.search( + search_params, original_hnsw_index, queries, k + ) + recall_original = calc_recall(np.asarray(out_idx_original), skl_idx) + assert recall_original >= 0.7, ( + f"Remapped HNSW recall {recall_original:.3f} below 0.7" + ) + + reordered_hnsw_index = hnsw.load( + hnsw_params, reordered_bin_path, n_cols, dtype, metric=metric + ) + _, out_idx_reordered = hnsw.search( + search_params, reordered_hnsw_index, queries, k + ) + recall_reordered = calc_recall(np.asarray(out_idx_reordered), skl_idx) + assert recall_reordered >= 0.7, ( + f"Reordered HNSW recall {recall_reordered:.3f} below 0.7" + )