diff --git a/CMakeLists.txt b/CMakeLists.txt index 5fa15b0d93..1f8e27dbe0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -254,6 +254,7 @@ message(STATUS "Binary name: ${NDD_BINARY_NAME}") set(NDD_CORE_SOURCES src/sparse/inverted_index.cpp src/utils/system_sanity/system_sanity.cpp + src/core/rebuild.cpp ) # Build non-main project sources separately so they can be compiled in parallel @@ -288,6 +289,7 @@ target_include_directories(ndd_core PRIVATE ${ASIO_INCLUDE_DIR} ${OPENSSL_INCLUDE_DIR} ${CURL_INCLUDE_DIRS} + ${LIBARCHIVE_INCLUDE_DIR} ) target_include_directories(${NDD_BINARY_NAME} PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src diff --git a/docs/logs.md b/docs/logs.md index 5ceb9d78b2..b8829df3c7 100644 --- a/docs/logs.md +++ b/docs/logs.md @@ -88,6 +88,7 @@ The same overload shapes apply to `LOG_WARN` and `LOG_ERROR`. - `1500s` metadata logs - `1600s` vector storage logs - `1700s` system sanity checks (CPU compatibility, disk, memory, ulimits) + - `1800s` rebuild subsystem logs - `2000s` index manager logs - `2100s` HNSW load/cache logs diff --git a/docs/rebuild.md b/docs/rebuild.md new file mode 100644 index 0000000000..a78342c344 --- /dev/null +++ b/docs/rebuild.md @@ -0,0 +1,134 @@ +# Index Rebuild + +Rebuild allows you to reconstruct an HNSW index graph with new configuration parameters (M, ef_construction) without re-uploading vector data. All vectors are re-indexed from MDBX storage — only the graph structure is rebuilt. + +## API Endpoints + +| Method | Endpoint | Description | +|--------|----------|-------------| +| POST | `/api/v1/index/{name}/rebuild` | Start async rebuild | +| GET | `/api/v1/index/{name}/rebuild/status` | Check rebuild progress | + +--- + +## Start Rebuild + +**POST** `/api/v1/index/{name}/rebuild` + +All parameters are optional. Omitted parameters retain their current values. + +```json +{ + "M": 32, + "ef_con": 256 +} +``` + +**Parameters:** + +| Parameter | Type | Description | +|-----------|------|-------------| +| `M` | int | HNSW graph connectivity (4–512) | +| `ef_con` | int | Construction-time search quality (8–4096) | + +**Response 202:** +```json +{ + "status": "rebuilding", + "previous_config": { "M": 16, "ef_con": 128 }, + "new_config": { "M": 32, "ef_con": 256 }, + "total_vectors": 50000 +} +``` + +**Errors:** + +| Code | Condition | +|------|-----------| +| 400 | No changes specified, invalid parameters, or attempted to change `precision`/`space_type` | +| 404 | Index not found | +| 409 | Rebuild or backup already in progress for this user | + +--- + +## Check Progress + +**GET** `/api/v1/index/{name}/rebuild/status` + +**Status values:** + +| Status | Meaning | +|--------|---------| +| `idle` | No rebuild has run for this index (or querying a different index) | +| `in_progress` | Rebuild is currently running | +| `completed` | Rebuild finished successfully | +| `failed` | Rebuild failed (see `error` field) | + +**In progress:** +```json +{ + "status": "in_progress", + "vectors_processed": 45000, + "total_vectors": 100000, + "percent_complete": 45.0, + "started_at": "2026-03-25T10:30:00Z" +} +``` + +**Completed:** +```json +{ + "status": "completed", + "vectors_processed": 100000, + "total_vectors": 100000, + "percent_complete": 100.0, + "started_at": "2026-03-25T10:30:00Z", + "completed_at": "2026-03-25T10:32:15Z" +} +``` + +**Failed:** +```json +{ + "status": "failed", + "vectors_processed": 45000, + "total_vectors": 100000, + "percent_complete": 45.0, + "started_at": "2026-03-25T10:30:00Z", + "completed_at": "2026-03-25T10:31:05Z", + "error": "Out of memory" +} +``` + +Status is per-index. The `completed`/`failed` state persists until the next rebuild is started for that user. + +--- + +## Restrictions + +The following parameters **cannot** be changed via rebuild (returns 400): +- `precision` (quantization level) +- `space_type` + + +--- + +## Behavior + +- **All vectors are re-indexed** from MDBX storage into a new HNSW graph with the updated configuration. +- **Search continues** during rebuild — queries use the old index until the rebuild completes. +- **Write operations** (insert, delete, update) will block and timeout while the rebuild is running, same as during backup. +- **One rebuild at a time per user** — cannot start a rebuild on any index while another rebuild is in progress for the same user. Also cannot run concurrently with a backup. +- **Periodic checkpoints** — the in-progress graph is saved to a temp file at regular intervals. +- **On completion**, the new graph replaces `default.idx`. All temporary and intermediate files are cleaned up. +- **On server restart** during an incomplete rebuild, the old index loads normally. Orphaned temp files are removed automatically on startup. The rebuild must be restarted manually. To confirm a rebuild was incomplete, check that M/ef_con in the index info still show the original values. + +--- + +## Capacity and Timing + +**Disk space:** Plan for roughly **2× the index file size** free. A temporary copy of the completed graph is written before being renamed into place. + +**Memory:** Both the old and new graphs are in RAM simultaneously during rebuild. Peak usage is approximately **2× the index graph size** in addition to normal vector storage. + +**Duration:** Roughly 8-10 minutes per million vectors on commodity hardware at default settings. Higher M or ef_con increases build time. The final disk save adds additional time proportional to index size. diff --git a/src/core/ndd.hpp b/src/core/ndd.hpp index 55f6e5bc57..8d1907efdf 100644 --- a/src/core/ndd.hpp +++ b/src/core/ndd.hpp @@ -197,8 +197,11 @@ struct PersistenceConfig { }; #include "../storage/backup_store.hpp" +#include "rebuild.hpp" +#include "utils/types.hpp" class IndexManager { + friend class Rebuild; // executeJob accesses saveIndexInternal + metadata_manager_ private: std::deque indices_list_; std::unordered_map> indices_; @@ -220,8 +223,8 @@ class IndexManager { std::thread autosave_thread_; std::atomic running_{true}; BackupStore backup_store_; - void executeBackupJob(const std::string& index_id, const std::string& backup_name, - std::stop_token st); + Rebuild rebuild_; + void executeBackupJob(const std::string& index_id, const std::string& backup_name, std::stop_token st); std::unique_ptr createWAL(const std::string& index_id) { const std::string wal_dir = data_dir_ + "/" + index_id; @@ -581,6 +584,7 @@ class IndexManager { backup_store_(data_dir) { std::filesystem::create_directories(data_dir); metadata_manager_ = std::make_unique(data_dir); + rebuild_.cleanupTempFiles(data_dir); // Start the autosave thread autosave_thread_ = std::thread(&IndexManager::autosaveLoop, this); } @@ -589,9 +593,10 @@ class IndexManager { // Signal all threads to stop (running_ is checked by autosave and backup threads) running_ = false; - // Join background backup threads before destroying members - // (prevents use-after-free when detached threads outlive IndexManager) + // Join background backup and rebuild threads before destroying members + // (prevents use-after-free when threads outlive IndexManager) backup_store_.joinAllThreads(); + rebuild_.joinAllThreads(); /** * Don't wait for autosave thread to exit. @@ -1108,47 +1113,15 @@ class IndexManager { logInsertsAndUpdates(entry, numeric_ids); // Add to HNSW index in parallel using pre-quantized data from QuantVectorObject - size_t available_threads = settings::NUM_PARALLEL_INSERTS; - const size_t num_threads = (available_threads < quantized_vectors.size()) - ? available_threads - : quantized_vectors.size(); - std::vector threads; - const size_t chunk_size = - (quantized_vectors.size() + num_threads - 1) / num_threads; // Ceiling division - - threads.reserve(num_threads); - for(size_t t = 0; t < num_threads; t++) { - threads.emplace_back([&, t]() { - // Calculate start and end indices for this thread - size_t start_idx = t * chunk_size; - size_t end_idx = (start_idx + chunk_size < quantized_vectors.size()) - ? (start_idx + chunk_size) - : quantized_vectors.size(); - - // Process assigned chunk of vectors - for(size_t i = start_idx; i < end_idx; i++) { - const auto& quant_vec_obj = quantized_vectors[i]; - - // Use pre-quantized data directly from QuantVectorObject - no conversion - // needed! - const uint8_t* vector_data = quant_vec_obj.quant_vector.data(); - - // Add to HNSW index using pre-quantized raw bytes - if(numeric_ids[i].second) { - // If it's a new ID, add it to the index - entry.alg->addPoint(vector_data, numeric_ids[i].first); - } else { - // If it's an update, add it to the index - entry.alg->addPoint(vector_data, numeric_ids[i].first); - } + parallelAddPoints(quantized_vectors.size(), settings::NUM_PARALLEL_INSERTS, + [&](size_t i) { + const uint8_t* vector_data = quantized_vectors[i].quant_vector.data(); + if(numeric_ids[i].second) { + entry.alg->addPoint(vector_data, numeric_ids[i].first); + } else { + entry.alg->addPoint(vector_data, numeric_ids[i].first); } }); - } - - // Wait for all threads to complete - for(auto& thread : threads) { - thread.join(); - } entry.markDirty(); @@ -1928,6 +1901,74 @@ class IndexManager { std::pair uploadBackup(const std::string& backup_name, const std::string& username, const std::string& file_content); + + // Metadata access + std::optional getMetadata(const std::string& index_id) { + return metadata_manager_->getMetadata(index_id); + } + + // Reads live count from the in-memory HNSW graph; meta->total_elements can be stale between saves. + size_t getElementCount(const std::string& index_id) { + auto entry = getIndexEntry(index_id); + return entry->alg->getElementsCount(); + } + + + // ========== Rebuild operations ========== + + // Return codes: + // 0: rebuild started successfully + // 1: index not found + // 2: rebuild or backup already in progress for this user + // 3: no configuration changes specified / invalid parameters + OperationResult rebuildIndexAsync(const std::string& index_id, + size_t new_M, + size_t new_ef_con); + + bool hasActiveRebuild(const std::string& username) const { + return rebuild_.hasActiveRebuild(username); + } + + nlohmann::json getRebuildProgress(const std::string& username, + const std::string& index_id) const { + return rebuild_.getProgress(username, index_id); + } + + // Shared parallel addPoint utility — static chunk partition (same as addVectors). + // ProcessFn signature: void(size_t index) + template + static void parallelAddPoints(size_t count, size_t max_threads, ProcessFn&& process) { + if (count == 0) return; + size_t num_threads = std::min(max_threads, count); + const size_t chunk_size = (count + num_threads - 1) / num_threads; + std::vector threads; + threads.reserve(num_threads); + for (size_t t = 0; t < num_threads; ++t) { + threads.emplace_back([&, t]() { + size_t start_idx = t * chunk_size; + size_t end_idx = std::min(start_idx + chunk_size, count); + for (size_t i = start_idx; i < end_idx; ++i) { + process(i); + } + }); + } + for (auto& th : threads) { + th.join(); + } + } + + // Wires vector fetchers on an HNSW graph. Must be called before addPoint — searchBaseLayer + // during graph construction needs fetchers to compute distances for base-layer-only nodes. + static void wireVectorFetchers(hnswlib::HierarchicalNSW* alg, + std::shared_ptr vs) { + alg->setVectorFetcher([vs](ndd::idInt label, uint8_t* buffer) { + return vs->get_vector(label, buffer); + }); + alg->setVectorFetcherBatch([vs](const ndd::idInt* labels, uint8_t* buffers, + bool* success, size_t count) -> size_t { + return vs->get_vectors_batch_into(labels, buffers, success, count); + }); + } }; // ========== IndexManager backup implementations ========== @@ -2221,6 +2262,7 @@ inline std::pair IndexManager::createBackupAsync(const std::s return {true, backup_name}; } + inline std::pair IndexManager::uploadBackup(const std::string& backup_name, const std::string& username, const std::string& file_content) { std::string user_backup_dir = backup_store_.getUserBackupDir(username); std::filesystem::create_directories(user_backup_dir); @@ -2280,4 +2322,67 @@ inline std::pair IndexManager::uploadBackup(const std::string backup_store_.writeBackupJson(username, backup_db); return {true, "Backup uploaded successfully"}; -} \ No newline at end of file +} + +// ========== IndexManager rebuild implementations ========== + +inline OperationResult IndexManager::rebuildIndexAsync(const std::string& index_id, + size_t new_M, + size_t new_ef_con) { + auto meta = metadata_manager_->getMetadata(index_id); + if (!meta) { + return {1, "Index not found"}; + } + + std::string username; + size_t pos = index_id.find('/'); + if (pos != std::string::npos) { + username = index_id.substr(0, pos); + } else { + return {3, "Invalid index ID format"}; + } + + if (backup_store_.hasActiveBackup(username)) { + return {2, "Backup already in progress for user: " + username}; + } + if (rebuild_.hasActiveRebuild(username)) { + return {2, "Rebuild already in progress for user: " + username}; + } + + // Pre-fetch entry now — captured by lambdas so the thread never calls getIndexEntry + auto entry = getIndexEntry(index_id); + size_t current_count = entry->alg->getElementsCount(); + + if (new_M == meta->M && new_ef_con == meta->ef_con) { + return {3, "No configuration changes specified"}; + } + + std::string base_path = data_dir_ + "/" + index_id; + std::string vector_storage_dir = base_path + "/vectors"; + + RebuildJobParams params{ + .username = username, + .new_M = new_M, + .new_ef_con = new_ef_con, + .entry = entry, + .manager = this, + .temp_path = Rebuild::getTempPath(base_path), + .timestamped_path = Rebuild::getTimestampedPath(base_path), + .index_path = vector_storage_dir + "/" + settings::DEFAULT_SUBINDEX + ".idx", + .num_parallel_inserts = settings::NUM_PARALLEL_INSERTS, + }; + + // Register state FIRST with empty thread — hasActiveRebuild() returns true immediately + rebuild_.setActiveRebuild(username, index_id, current_count); + + // Spawn thread — lambda calls rebuild_.executeJob directly (execution lives in Rebuild) + std::jthread t([this, params = std::move(params)](std::stop_token st) { + rebuild_.executeJob(params, st); + }); + + // Move real thread into the already-registered state + rebuild_.attachRebuildThread(username, std::move(t)); + + LOG_INFO(1800, index_id, "Rebuild started: M=" << new_M << " ef_con=" << new_ef_con); + return {0, "Rebuild started"}; +} diff --git a/src/core/rebuild.cpp b/src/core/rebuild.cpp new file mode 100644 index 0000000000..aca0c0201a --- /dev/null +++ b/src/core/rebuild.cpp @@ -0,0 +1,271 @@ +#include "rebuild.hpp" + +#include +#include +#include +#include + +#include "settings.hpp" +#include "log.hpp" +#include "utils/types.hpp" +#include "ndd.hpp" // CacheEntry, IndexManager (friend access) + +std::string Rebuild::statusToString(RebuildStatus s) { + switch (s) { + case RebuildStatus::IN_PROGRESS: return "in_progress"; + case RebuildStatus::COMPLETED: return "completed"; + case RebuildStatus::FAILED: return "failed"; + } + __builtin_unreachable(); +} + +std::string Rebuild::timeToISO8601(std::chrono::system_clock::time_point tp) { + auto time_t_val = std::chrono::system_clock::to_time_t(tp); + std::tm tm_val{}; + gmtime_r(&time_t_val, &tm_val); + std::ostringstream oss; + oss << std::put_time(&tm_val, "%Y-%m-%dT%H:%M:%SZ"); + return oss.str(); +} + +void Rebuild::cleanupTempFiles(const std::string& data_dir) { + if (!std::filesystem::exists(data_dir)) { + return; + } + try { + std::string temp_filename = std::string(settings::DEFAULT_SUBINDEX) + ".idx.temp"; + std::string ts_prefix = std::string(settings::DEFAULT_SUBINDEX) + ".idx."; + for (const auto& entry : std::filesystem::recursive_directory_iterator(data_dir)) { + if (!entry.is_regular_file()) continue; + const std::string fname = entry.path().filename().string(); + bool is_temp = (fname == temp_filename); + bool is_ts = fname.size() > ts_prefix.size() + && fname.substr(0, ts_prefix.size()) == ts_prefix + && std::all_of(fname.begin() + ts_prefix.size(), fname.end(), ::isdigit); + if (is_temp || is_ts) { + std::filesystem::remove(entry.path()); + } + } + } catch (const std::filesystem::filesystem_error& e) { + if (e.code() != std::errc::no_such_file_or_directory) + LOG_WARN(1803, "rebuild", "Error during temp cleanup: " << e.what()); + } catch (const std::exception& e) { + LOG_WARN(1803, "rebuild", "Error during temp cleanup: " << e.what()); + } +} + +void Rebuild::setActiveRebuild(const std::string& username, const std::string& index_id, + size_t total_vectors) { + std::lock_guard lock(rebuild_state_mutex_); + auto state = std::make_shared(); + state->index_id = index_id; + state->status = RebuildStatus::IN_PROGRESS; + state->total_vectors = total_vectors; + state->vectors_processed = 0; + state->started_at = std::chrono::system_clock::now(); + active_rebuilds_[username] = state; +} + +void Rebuild::completeActiveRebuild(const std::string& username) { + std::lock_guard lock(rebuild_state_mutex_); + auto it = active_rebuilds_.find(username); + if (it != active_rebuilds_.end()) { + // Called from within the thread — detach so the jthread dtor doesn't join us + if (it->second->thread.joinable()) { + it->second->thread.detach(); + } + it->second->status = RebuildStatus::COMPLETED; + it->second->completed_at = std::chrono::system_clock::now(); + } +} + +void Rebuild::failActiveRebuild(const std::string& username, const std::string& error) { + std::lock_guard lock(rebuild_state_mutex_); + auto it = active_rebuilds_.find(username); + if (it != active_rebuilds_.end()) { + // Called from within the thread — detach so the jthread dtor doesn't join us + if (it->second->thread.joinable()) { + it->second->thread.detach(); + } + it->second->status = RebuildStatus::FAILED; + it->second->error_message = error; + it->second->completed_at = std::chrono::system_clock::now(); + } +} + +bool Rebuild::hasActiveRebuild(const std::string& username) const { + std::lock_guard lock(rebuild_state_mutex_); + auto it = active_rebuilds_.find(username); + // Only IN_PROGRESS blocks a new rebuild + return it != active_rebuilds_.end() && it->second->status == RebuildStatus::IN_PROGRESS; +} + +void Rebuild::joinAllThreads() { + std::vector threads_to_join; + { + std::lock_guard lock(rebuild_state_mutex_); + for (auto& [username, state] : active_rebuilds_) { + if (state->thread.joinable()) { + threads_to_join.push_back(std::move(state->thread)); + } + } + active_rebuilds_.clear(); + } + for (auto& t : threads_to_join) { + t.request_stop(); + if (t.joinable()) { + t.join(); + } + } +} + +void Rebuild::attachRebuildThread(const std::string& username, std::jthread&& thread) { + std::lock_guard lock(rebuild_state_mutex_); + auto it = active_rebuilds_.find(username); + if (it != active_rebuilds_.end()) { + it->second->thread = std::move(thread); + } +} + +void Rebuild::updateProgress(const std::string& username, size_t processed) { + std::lock_guard lock(rebuild_state_mutex_); + auto it = active_rebuilds_.find(username); + if (it != active_rebuilds_.end()) { + it->second->vectors_processed = processed; + } +} + +nlohmann::json Rebuild::getProgress(const std::string& username, const std::string& index_id) const { + std::lock_guard lock(rebuild_state_mutex_); + auto it = active_rebuilds_.find(username); + if (it != active_rebuilds_.end() && it->second->index_id == index_id) { + const auto& state = *it->second; + size_t processed = state.vectors_processed; + size_t total = state.total_vectors; + double percent = total > 0 ? (100.0 * processed / total) : 0.0; + nlohmann::json result = { + {"status", statusToString(state.status)}, + {"vectors_processed", processed}, + {"total_vectors", total}, + {"percent_complete", percent}, + {"started_at", timeToISO8601(state.started_at)} + }; + if (state.status == RebuildStatus::COMPLETED || state.status == RebuildStatus::FAILED) { + result["completed_at"] = timeToISO8601(state.completed_at); + } + if (state.status == RebuildStatus::FAILED && !state.error_message.empty()) { + result["error"] = state.error_message; + } + return result; + } + return {{"status", "idle"}}; +} + +std::string Rebuild::getTempPath(const std::string& index_dir) { + return index_dir + "/vectors/" + settings::DEFAULT_SUBINDEX + ".idx.temp"; +} + +std::string Rebuild::getTimestampedPath(const std::string& index_dir) { + auto ts = std::to_string( + std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch() + ).count() + ); + return index_dir + "/vectors/" + settings::DEFAULT_SUBINDEX + ".idx." + ts; +} + +void Rebuild::executeJob(const RebuildJobParams& p, std::stop_token st) { + auto& entry = p.entry; // shared_ptr + auto* manager = p.manager; + try { + std::unique_lock op_lock(entry->operation_mutex); + + // Phase 1 — save current state before rebuilding + manager->saveIndexInternal(*entry); + + // Phase 2 — build new HNSW with updated M/ef_con + auto* old_alg = entry->alg.get(); + auto new_alg = std::make_unique>( + old_alg->getMaxElements(), old_alg->getSpaceType(), old_alg->getDimension(), + p.new_M, p.new_ef_con, + settings::RANDOM_SEED, old_alg->getQuantLevel(), old_alg->getChecksum()); + + // MUST wire fetchers before addPoint — searchBaseLayer needs this for base-layer-only nodes + IndexManager::wireVectorFetchers(new_alg.get(), entry->vector_storage); + + auto deleted_ids_vec = entry->id_mapper->getDeletedIds(SIZE_MAX); + std::unordered_set deleted_ids(deleted_ids_vec.begin(), deleted_ids_vec.end()); + auto cursor = entry->vector_storage->getCursor(); + const size_t batch_size = settings::RECOVERY_BATCH_SIZE; + size_t total_processed = 0; + size_t batches_since_checkpoint = 0; + constexpr size_t CHECKPOINT_INTERVAL = 5; + + while (cursor.hasNext()) { + if (st.stop_requested()) { + if (std::filesystem::exists(p.temp_path)) + std::filesystem::remove(p.temp_path); + failActiveRebuild(p.username, "Rebuild interrupted by server shutdown"); + return; + } + + std::vector>> batch; + batch.reserve(batch_size); + while (cursor.hasNext() && batch.size() < batch_size) { + auto [label, vec_bytes] = cursor.next(); + if (!vec_bytes.empty() && deleted_ids.count(label) == 0) + batch.emplace_back(label, std::move(vec_bytes)); + } + if (batch.empty()) break; + + IndexManager::parallelAddPoints(batch.size(), p.num_parallel_inserts, + [&](size_t i) { + const auto& [label, vec_bytes] = batch[i]; + new_alg->addPoint(vec_bytes.data(), label); + }); + + total_processed += batch.size(); + updateProgress(p.username, total_processed); + + if (++batches_since_checkpoint >= CHECKPOINT_INTERVAL) { + new_alg->saveIndex(p.temp_path); + batches_since_checkpoint = 0; + } + } + + if (st.stop_requested()) { + if (std::filesystem::exists(p.temp_path)) + std::filesystem::remove(p.temp_path); + failActiveRebuild(p.username, "Rebuild interrupted by server shutdown"); + return; + } + + // Phase 3 — persist to timestamped path, atomically rename to canonical path + new_alg->saveIndex(p.timestamped_path); + std::filesystem::rename(p.timestamped_path, p.index_path); + + if (std::filesystem::exists(p.temp_path)) std::filesystem::remove(p.temp_path); + + // new_alg is fully built and fetchers are already wired (line 194) — use directly + entry->alg = std::move(new_alg); + + // Update metadata (uses friend access to manager->metadata_manager_) + auto m = manager->metadata_manager_->getMetadata(entry->index_id); + if (m) { + m->M = p.new_M; + m->ef_con = p.new_ef_con; + m->total_elements = entry->alg->getElementsCount(); + manager->metadata_manager_->storeMetadata(entry->index_id, *m); + } + + entry->is_dirty = false; + + LOG_INFO(1801, entry->index_id, "Rebuild completed: " << total_processed << " vectors rebuilt"); + completeActiveRebuild(p.username); + + } catch (const std::exception& e) { + LOG_ERROR(1802, entry->index_id, "Rebuild failed: " << e.what()); + if (std::filesystem::exists(p.temp_path)) std::filesystem::remove(p.temp_path); + failActiveRebuild(p.username, e.what()); + } +} diff --git a/src/core/rebuild.hpp b/src/core/rebuild.hpp new file mode 100644 index 0000000000..4bd14a44ae --- /dev/null +++ b/src/core/rebuild.hpp @@ -0,0 +1,78 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "json/nlohmann_json.hpp" + +// Forward declarations — full definitions live in ndd.hpp, included by rebuild.cpp. +struct CacheEntry; +class IndexManager; + +enum class RebuildStatus : unsigned char { + IN_PROGRESS = 0, + COMPLETED = 1, + FAILED = 2 +}; + +struct ActiveRebuild { + std::string index_id; + RebuildStatus status{RebuildStatus::IN_PROGRESS}; + std::string error_message; + size_t vectors_processed{0}; + size_t total_vectors{0}; + std::chrono::system_clock::time_point started_at; + std::chrono::system_clock::time_point completed_at; + std::jthread thread; // jthread: built-in stop_token + auto-join on destruction +}; + +// Parameters passed to Rebuild::executeJob. `entry` and `manager` give executeJob +// direct access to graph config, vector storage, mutexes, save/metadata operations. +struct RebuildJobParams { + std::string username; + size_t new_M; + size_t new_ef_con; + + std::shared_ptr entry; // shared_ptr keeps CacheEntry alive for the rebuild duration + IndexManager* manager; // saveIndexInternal, metadata_manager_ (via friend) + + std::string temp_path; + std::string timestamped_path; + std::string index_path; + size_t num_parallel_inserts; +}; + +class Rebuild { +private: + std::unordered_map> active_rebuilds_; + mutable std::mutex rebuild_state_mutex_; + + static std::string statusToString(RebuildStatus s); + static std::string timeToISO8601(std::chrono::system_clock::time_point tp); + +public: + Rebuild() = default; + + void cleanupTempFiles(const std::string& data_dir); + + void setActiveRebuild(const std::string& username, const std::string& index_id, + size_t total_vectors); + void completeActiveRebuild(const std::string& username); + void failActiveRebuild(const std::string& username, const std::string& error); + bool hasActiveRebuild(const std::string& username) const; + void joinAllThreads(); + void attachRebuildThread(const std::string& username, std::jthread&& thread); + void updateProgress(const std::string& username, size_t processed); + nlohmann::json getProgress(const std::string& username, const std::string& index_id) const; + + static std::string getTempPath(const std::string& index_dir); + static std::string getTimestampedPath(const std::string& index_dir); + + void executeJob(const RebuildJobParams& p, std::stop_token st); +}; diff --git a/src/main.cpp b/src/main.cpp index 4654a54c20..05cbf21ac8 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -692,6 +692,98 @@ int main(int argc, char** argv) { } }); + // ========== Rebuild operations ========== + + // Start index rebuild + CROW_ROUTE(app, "/api/v1/index//rebuild") + .CROW_MIDDLEWARES(app, AuthMiddleware) + .methods("POST"_method)([&index_manager, &app](const crow::request& req, + const std::string& index_name) { + auto& ctx = app.get_context(req); + std::string index_id = ctx.username + "/" + index_name; + + auto body = crow::json::load(req.body); + if (!body) { + return json_error(400, "Invalid JSON"); + } + + // Reject parameters that cannot be changed via rebuild + if (body.has("precision")) { + return json_error(400, "precision cannot be changed via rebuild"); + } + if (body.has("space_type")) { + return json_error(400, "space_type cannot be changed via rebuild"); + } + + // Get current metadata for defaults + auto meta = index_manager.getMetadata(index_id); + if (!meta) { + return json_error(404, "Index not found"); + } + // Parse parameters with current values as defaults + size_t new_M = body.has("M") ? (size_t)body["M"].i() : meta->M; + size_t new_ef_con = body.has("ef_con") ? (size_t)body["ef_con"].i() : meta->ef_con; + + // Validate M + if (new_M < settings::MIN_M || new_M > settings::MAX_M) { + return json_error(400, + "M must be between " + std::to_string(settings::MIN_M) + + " and " + std::to_string(settings::MAX_M)); + } + + // Validate ef_con + if (new_ef_con < settings::MIN_EF_CONSTRUCT || new_ef_con > settings::MAX_EF_CONSTRUCT) { + return json_error(400, + "ef_con must be between " + std::to_string(settings::MIN_EF_CONSTRUCT) + + " and " + std::to_string(settings::MAX_EF_CONSTRUCT)); + } + + // Use live count — meta->total_elements can be stale if not yet flushed to disk + size_t actual_element_count = index_manager.getElementCount(index_id); + if (actual_element_count == 0) { + return json_error(400, "Cannot rebuild an empty index"); + } + + try { + auto result = index_manager.rebuildIndexAsync(index_id, new_M, new_ef_con); + + if (result.code == 1) return json_error(404, result.message); + if (result.code == 2) return json_error(409, result.message); + if (result.code == 3) return json_error(400, result.message); + + crow::json::wvalue response; + response["status"] = "rebuilding"; + response["previous_config"]["M"] = meta->M; + response["previous_config"]["ef_con"] = meta->ef_con; + response["new_config"]["M"] = new_M; + response["new_config"]["ef_con"] = new_ef_con; + response["total_vectors"] = actual_element_count; + return crow::response(202, response.dump()); + } catch (const std::exception& e) { + return json_error_500(ctx.username, index_name, req.url, e.what()); + } + }); + + // Get rebuild status + CROW_ROUTE(app, "/api/v1/index//rebuild/status") + .CROW_MIDDLEWARES(app, AuthMiddleware) + .methods("GET"_method)([&index_manager, &app](const crow::request& req, + const std::string& index_name) { + auto& ctx = app.get_context(req); + std::string index_id = ctx.username + "/" + index_name; + + try { + auto progress = index_manager.getRebuildProgress(ctx.username, index_id); + crow::response res; + res.code = 200; + res.set_header("Content-Type", "application/json"); + res.body = progress.dump(); + return res; + } catch (const std::exception& e) { + return json_error_500(ctx.username, index_name, req.url, e.what()); + } + }); + // List indexes for current user CROW_ROUTE(app, "/api/v1/index/list") .CROW_MIDDLEWARES(app, AuthMiddleware) diff --git a/src/utils/settings.hpp b/src/utils/settings.hpp index 9949e9109e..07210e7bc9 100644 --- a/src/utils/settings.hpp +++ b/src/utils/settings.hpp @@ -5,6 +5,7 @@ #include #include #include +#include constexpr uint64_t KB = (1024ULL); constexpr uint64_t MB = (1024ULL * KB); diff --git a/src/utils/types.hpp b/src/utils/types.hpp new file mode 100644 index 0000000000..e45d13678f --- /dev/null +++ b/src/utils/types.hpp @@ -0,0 +1,11 @@ +#pragma once +#include + +// Generic operation result returned by async and sync operations. +// Each function documents its return codes in comments above its declaration. +// Code 0 always means success. Non-zero codes are operation-specific. +// Codes can be conglomerated into ENUMs per operation as the codebase matures. +struct OperationResult { + unsigned char code; // 0 = success, non-zero = error (operation-specific) + std::string message; +}; diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 0793a2e2f3..ccd5019366 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -3,6 +3,7 @@ include(FetchContent) FetchContent_Declare( googletest URL https://github.com/google/googletest/archive/refs/tags/v1.14.0.zip + DOWNLOAD_EXTRACT_TIMESTAMP TRUE ) # For Windows: Prevent overriding the parent project's compiler/linker settings set(gtest_force_shared_crt ON CACHE BOOL "" FORCE) @@ -38,3 +39,36 @@ target_compile_definitions(ndd_filter_test PRIVATE MDB_MAXKEYSIZE=512) include(GoogleTest) gtest_discover_tests(ndd_filter_test) + +# --- ndd_rebuild_test --- +add_executable(ndd_rebuild_test rebuild_test.cpp ${LMDB_SOURCES} ${ROARING_SOURCE}) + +set_source_files_properties(${LMDB_SOURCES} PROPERTIES + COMPILE_FLAGS "-DMDBX_BUILD_SHARED_LIBRARY=0 -DMDBX_BUILD_FLAGS=\\\"NDD_EMBEDDED\\\"" +) + +target_include_directories(ndd_rebuild_test PRIVATE + ${CMAKE_SOURCE_DIR}/src + ${CMAKE_SOURCE_DIR}/src/core + ${CMAKE_SOURCE_DIR}/src/utils + ${CMAKE_SOURCE_DIR}/src/server + ${CMAKE_SOURCE_DIR}/src/storage + ${CMAKE_SOURCE_DIR}/src/filter + ${CMAKE_SOURCE_DIR}/src/sparse + ${CMAKE_SOURCE_DIR}/src/hnsw + ${CMAKE_SOURCE_DIR}/src/quant + ${CMAKE_SOURCE_DIR}/third_party + ${CMAKE_SOURCE_DIR}/third_party/json + ${CMAKE_SOURCE_DIR}/third_party/msgpack/include + ${LIBARCHIVE_INCLUDE_DIR} +) + +target_link_libraries(ndd_rebuild_test + PRIVATE + ndd_core + GTest::gtest_main +) + +target_compile_definitions(ndd_rebuild_test PRIVATE MDB_MAXKEYSIZE=512) + +gtest_discover_tests(ndd_rebuild_test) diff --git a/tests/README.md b/tests/README.md index a62ef40998..a5a04a2c0a 100644 --- a/tests/README.md +++ b/tests/README.md @@ -1,19 +1,83 @@ # Tests -This folder contains unit tests for Endee. +Unit tests for Endee. Currently two test suites: filter and rebuild. -## Build & Run +## Build & Run All Tests -From the repository root: + cmake -S . -B build -DENABLE_TESTING=ON -DUSE_NEON=ON # Apple Silicon + cmake -S . -B build -DENABLE_TESTING=ON -DUSE_AVX2=ON # Intel/AMD + cmake --build build + ctest --test-dir build --output-on-failure -1. Configure with tests enabled: - - `cmake -S . -B build -DENABLE_TESTING=ON` -2. Build the test target: - - `cmake --build build --target ndd_filter_test` -3. Run: - - `./build/tests/ndd_filter_test` +## ndd_filter_test + +Tests for the filter subsystem (categorical, numeric, boolean filtering). + +Build and run individually: + + cmake --build build --target ndd_filter_test + ./build/tests/ndd_filter_test + +Test cases: +- BucketTest: bucket serialization and deserialization +- FilterTest/CategoryFilterBasics: string category filter add and query +- FilterTest/BooleanFilterBasics: boolean filter via JSON input +- FilterTest/NumericFilterBasics: integer range queries +- FilterTest/FloatNumericFilter: float range queries +- FilterTest/MixedAndLogic: AND logic across multiple fields +- FilterTest/InOperator: $in operator with multiple values +- FilterTest/DeleteFilter: removal of categorical filters +- FilterTest/NumericDelete: removal of numeric filters + +## ndd_rebuild_test + +Unit and integration tests for the rebuild subsystem. + +Build and run individually: + + cmake --build build --target ndd_rebuild_test + ./build/tests/ndd_rebuild_test + +Test cases: + +State management (Rebuild class in isolation): +- RebuildStateTest/NoRebuild_HasActiveIsFalse +- RebuildStateTest/NoRebuild_GetProgressIsIdle +- RebuildStateTest/SetActive_HasActiveIsTrue +- RebuildStateTest/SetActive_GetProgressShowsInProgress +- RebuildStateTest/UpdateProgress_ReflectedInGetProgress +- RebuildStateTest/PercentComplete_CalculatedCorrectly +- RebuildStateTest/PercentComplete_ZeroTotal_IsZero +- RebuildStateTest/Complete_StatusIsCompleted +- RebuildStateTest/Complete_HasActiveIsFalse +- RebuildStateTest/Complete_CompletedAtPresent +- RebuildStateTest/Fail_StatusIsFailed +- RebuildStateTest/Fail_HasActiveIsFalse +- RebuildStateTest/Fail_ErrorMessagePresent +- RebuildStateTest/Fail_CompletedAtPresent +- RebuildStateTest/TwoUsers_IndependentState +- RebuildStateTest/GetProgress_WrongIndex_ReturnsIdle +- RebuildStateTest/SetActive_OverwritesPreviousCompleted + +Temp file cleanup and path helpers: +- RebuildCleanupTest/CleanupTempFiles_NonExistentDir_NoOp +- RebuildCleanupTest/CleanupTempFiles_RemovesTempFile +- RebuildCleanupTest/CleanupTempFiles_RemovesTimestampedFile +- RebuildCleanupTest/CleanupTempFiles_LeavesCanonicalIndex +- RebuildCleanupTest/CleanupTempFiles_EmptyDir_NoOp +- RebuildPathTest/GetTempPath_Format +- RebuildPathTest/GetTimestampedPath_HasTimestamp + +End-to-end rebuild via IndexManager: +- RebuildIntegrationTest/RebuildAsync_ReturnSuccessCode +- RebuildIntegrationTest/RebuildCompletes_ConfigUpdated +- RebuildIntegrationTest/RebuildCompletes_VectorCountPreserved +- RebuildIntegrationTest/RebuildWhileInProgress_Returns409Code +- RebuildIntegrationTest/RebuildNonExistentIndex_Returns404Code +- RebuildIntegrationTest/RebuildNoChange_Returns400Code ## Notes -- Tests can also be built in a dedicated tests build directory (e.g., `tests/build/`). +- Tests use real file I/O and real MDBX databases — no mocking. +- Each test creates its own temp directory and removes it on teardown. - The `tests/build/` directory is ignored by git. diff --git a/tests/filter_test.cpp b/tests/filter_test.cpp index 101be3403e..f75d51ed15 100644 --- a/tests/filter_test.cpp +++ b/tests/filter_test.cpp @@ -37,7 +37,7 @@ class FilterTest : public ::testing::Test { } // Initialize Filter - filter = std::make_unique(db_path); + filter = std::make_unique(db_path, "testuser/testidx"); } void TearDown() override { diff --git a/tests/rebuild_test.cpp b/tests/rebuild_test.cpp new file mode 100644 index 0000000000..e3a8a38f6a --- /dev/null +++ b/tests/rebuild_test.cpp @@ -0,0 +1,328 @@ +#include +#include +#include +#include +#include + +#include "rebuild.hpp" +#include "ndd.hpp" +#include "utils/msgpack_ndd.hpp" +#include "server/auth.hpp" + +namespace fs = std::filesystem; + +// ============================================================ +// Layer 1 — Rebuild state management (no IndexManager needed) +// ============================================================ + +class RebuildStateTest : public ::testing::Test { +protected: + Rebuild rebuild; +}; + +TEST_F(RebuildStateTest, NoRebuild_HasActiveIsFalse) { + EXPECT_FALSE(rebuild.hasActiveRebuild("alice")); +} + +TEST_F(RebuildStateTest, NoRebuild_GetProgressIsIdle) { + auto p = rebuild.getProgress("alice", "alice/idx"); + EXPECT_EQ(p["status"], "idle"); +} + +TEST_F(RebuildStateTest, SetActive_HasActiveIsTrue) { + rebuild.setActiveRebuild("alice", "alice/idx", 100); + EXPECT_TRUE(rebuild.hasActiveRebuild("alice")); +} + +TEST_F(RebuildStateTest, SetActive_GetProgressShowsInProgress) { + rebuild.setActiveRebuild("alice", "alice/idx", 200); + auto p = rebuild.getProgress("alice", "alice/idx"); + EXPECT_EQ(p["status"], "in_progress"); + EXPECT_EQ(p["total_vectors"], 200); + EXPECT_EQ(p["vectors_processed"], 0); +} + +TEST_F(RebuildStateTest, UpdateProgress_ReflectedInGetProgress) { + rebuild.setActiveRebuild("alice", "alice/idx", 100); + rebuild.updateProgress("alice", 50); + auto p = rebuild.getProgress("alice", "alice/idx"); + EXPECT_EQ(p["vectors_processed"], 50); +} + +TEST_F(RebuildStateTest, PercentComplete_CalculatedCorrectly) { + rebuild.setActiveRebuild("alice", "alice/idx", 100); + rebuild.updateProgress("alice", 50); + auto p = rebuild.getProgress("alice", "alice/idx"); + EXPECT_DOUBLE_EQ(p["percent_complete"].get(), 50.0); +} + +TEST_F(RebuildStateTest, PercentComplete_ZeroTotal_IsZero) { + rebuild.setActiveRebuild("alice", "alice/idx", 0); + auto p = rebuild.getProgress("alice", "alice/idx"); + EXPECT_DOUBLE_EQ(p["percent_complete"].get(), 0.0); +} + +TEST_F(RebuildStateTest, Complete_StatusIsCompleted) { + rebuild.setActiveRebuild("alice", "alice/idx", 100); + rebuild.completeActiveRebuild("alice"); + auto p = rebuild.getProgress("alice", "alice/idx"); + EXPECT_EQ(p["status"], "completed"); +} + +TEST_F(RebuildStateTest, Complete_HasActiveIsFalse) { + rebuild.setActiveRebuild("alice", "alice/idx", 100); + rebuild.completeActiveRebuild("alice"); + EXPECT_FALSE(rebuild.hasActiveRebuild("alice")); +} + +TEST_F(RebuildStateTest, Complete_CompletedAtPresent) { + rebuild.setActiveRebuild("alice", "alice/idx", 100); + rebuild.completeActiveRebuild("alice"); + auto p = rebuild.getProgress("alice", "alice/idx"); + EXPECT_TRUE(p.contains("completed_at")); +} + +TEST_F(RebuildStateTest, Fail_StatusIsFailed) { + rebuild.setActiveRebuild("alice", "alice/idx", 100); + rebuild.failActiveRebuild("alice", "disk full"); + auto p = rebuild.getProgress("alice", "alice/idx"); + EXPECT_EQ(p["status"], "failed"); +} + +TEST_F(RebuildStateTest, Fail_HasActiveIsFalse) { + rebuild.setActiveRebuild("alice", "alice/idx", 100); + rebuild.failActiveRebuild("alice", "disk full"); + EXPECT_FALSE(rebuild.hasActiveRebuild("alice")); +} + +TEST_F(RebuildStateTest, Fail_ErrorMessagePresent) { + rebuild.setActiveRebuild("alice", "alice/idx", 100); + rebuild.failActiveRebuild("alice", "disk full"); + auto p = rebuild.getProgress("alice", "alice/idx"); + EXPECT_EQ(p["error"], "disk full"); +} + +TEST_F(RebuildStateTest, Fail_CompletedAtPresent) { + rebuild.setActiveRebuild("alice", "alice/idx", 100); + rebuild.failActiveRebuild("alice", "oom"); + auto p = rebuild.getProgress("alice", "alice/idx"); + EXPECT_TRUE(p.contains("completed_at")); +} + +TEST_F(RebuildStateTest, TwoUsers_IndependentState) { + rebuild.setActiveRebuild("alice", "alice/idx", 100); + EXPECT_TRUE(rebuild.hasActiveRebuild("alice")); + EXPECT_FALSE(rebuild.hasActiveRebuild("bob")); + rebuild.setActiveRebuild("bob", "bob/idx", 50); + EXPECT_TRUE(rebuild.hasActiveRebuild("bob")); + rebuild.completeActiveRebuild("alice"); + EXPECT_FALSE(rebuild.hasActiveRebuild("alice")); + EXPECT_TRUE(rebuild.hasActiveRebuild("bob")); +} + +TEST_F(RebuildStateTest, GetProgress_WrongIndex_ReturnsIdle) { + rebuild.setActiveRebuild("alice", "alice/idx", 100); + auto p = rebuild.getProgress("alice", "alice/other"); + EXPECT_EQ(p["status"], "idle"); +} + +TEST_F(RebuildStateTest, SetActive_OverwritesPreviousCompleted) { + rebuild.setActiveRebuild("alice", "alice/idx", 100); + rebuild.completeActiveRebuild("alice"); + rebuild.setActiveRebuild("alice", "alice/idx", 200); + auto p = rebuild.getProgress("alice", "alice/idx"); + EXPECT_EQ(p["status"], "in_progress"); + EXPECT_EQ(p["total_vectors"], 200); +} + +// ============================================================ +// Layer 2 — Temp file cleanup and path helpers +// ============================================================ + +class RebuildCleanupTest : public ::testing::Test { +protected: + std::string dir_; + Rebuild rebuild_; + + void SetUp() override { + dir_ = "./test_rebuild_cleanup_" + std::to_string(rand()); + fs::create_directories(dir_ + "/user/idx/vectors"); + } + + void TearDown() override { + if (fs::exists(dir_)) fs::remove_all(dir_); + } + + void touch(const std::string& rel_path) { + std::ofstream f(dir_ + "/" + rel_path); + f << "x"; + } + + bool exists(const std::string& rel_path) { + return fs::exists(dir_ + "/" + rel_path); + } +}; + +TEST_F(RebuildCleanupTest, CleanupTempFiles_NonExistentDir_NoOp) { + EXPECT_NO_THROW(rebuild_.cleanupTempFiles("/nonexistent/path/xyz")); +} + +TEST_F(RebuildCleanupTest, CleanupTempFiles_RemovesTempFile) { + touch("user/idx/vectors/default.idx.temp"); + rebuild_.cleanupTempFiles(dir_); + EXPECT_FALSE(exists("user/idx/vectors/default.idx.temp")); +} + +TEST_F(RebuildCleanupTest, CleanupTempFiles_RemovesTimestampedFile) { + touch("user/idx/vectors/default.idx.1714900000"); + rebuild_.cleanupTempFiles(dir_); + EXPECT_FALSE(exists("user/idx/vectors/default.idx.1714900000")); +} + +TEST_F(RebuildCleanupTest, CleanupTempFiles_LeavesCanonicalIndex) { + touch("user/idx/vectors/default.idx"); + rebuild_.cleanupTempFiles(dir_); + EXPECT_TRUE(exists("user/idx/vectors/default.idx")); +} + +TEST_F(RebuildCleanupTest, CleanupTempFiles_EmptyDir_NoOp) { + EXPECT_NO_THROW(rebuild_.cleanupTempFiles(dir_)); +} + +TEST(RebuildPathTest, GetTempPath_Format) { + auto path = Rebuild::getTempPath("/data/user/idx"); + EXPECT_EQ(path, "/data/user/idx/vectors/default.idx.temp"); +} + +TEST(RebuildPathTest, GetTimestampedPath_HasTimestamp) { + auto path = Rebuild::getTimestampedPath("/data/user/idx"); + // Should match /data/user/idx/vectors/default.idx. + std::string prefix = "/data/user/idx/vectors/default.idx."; + ASSERT_GT(path.size(), prefix.size()); + EXPECT_EQ(path.substr(0, prefix.size()), prefix); + std::string suffix = path.substr(prefix.size()); + EXPECT_FALSE(suffix.empty()); + EXPECT_TRUE(std::all_of(suffix.begin(), suffix.end(), ::isdigit)); +} + +// ============================================================ +// Layer 3 — Integration tests via IndexManager +// ============================================================ + +class RebuildIntegrationTest : public ::testing::Test { +protected: + static constexpr const char* USERNAME = "testuser"; + static constexpr const char* IDX_NAME = "testidx"; + static constexpr const char* INDEX_ID = "testuser/testidx"; + static constexpr size_t DIM = 32; + static constexpr size_t N_VECTORS = 100; + + std::string data_dir_; + std::unique_ptr manager_; + + void SetUp() override { + data_dir_ = "./test_rebuild_integration_" + std::to_string(rand()); + fs::create_directories(data_dir_); + PersistenceConfig pcfg; + pcfg.save_on_shutdown = false; + manager_ = std::make_unique(data_dir_, pcfg); + } + + void TearDown() override { + manager_.reset(); + if (fs::exists(data_dir_)) fs::remove_all(data_dir_); + } + + void createTestIndex(size_t M = 8, size_t ef_con = 64) { + IndexConfig config{ + .dim = DIM, + .max_elements = 1000, + .space_type_str = "cosine", + .M = M, + .ef_construction = ef_con, + .quant_level = ndd::quant::QuantizationLevel::FP32, + .checksum = 0 + }; + manager_->createIndex(INDEX_ID, config, UserType::Admin, 0); + } + + void insertVectors(size_t n = N_VECTORS) { + std::vector vecs; + vecs.reserve(n); + for (size_t i = 0; i < n; ++i) { + ndd::HybridVectorObject v; + v.id = "vec_" + std::to_string(i); + v.vector.resize(DIM); + for (size_t d = 0; d < DIM; ++d) + v.vector[d] = static_cast(rand()) / RAND_MAX; + vecs.push_back(std::move(v)); + } + manager_->addVectors(INDEX_ID, vecs); + } + + // Returns true if rebuild completed successfully within timeout_sec. + bool waitForRebuild(int timeout_sec = 10) { + auto deadline = std::chrono::steady_clock::now() + + std::chrono::seconds(timeout_sec); + while (std::chrono::steady_clock::now() < deadline) { + auto progress = manager_->getRebuildProgress(USERNAME, INDEX_ID); + std::string status = progress.value("status", ""); + if (status == "completed") return true; + if (status == "failed") return false; + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + return false; + } +}; + +TEST_F(RebuildIntegrationTest, RebuildAsync_ReturnSuccessCode) { + createTestIndex(); + insertVectors(); + auto result = manager_->rebuildIndexAsync(INDEX_ID, 16, 128); + EXPECT_EQ(result.code, 0); + waitForRebuild(); +} + +TEST_F(RebuildIntegrationTest, RebuildCompletes_ConfigUpdated) { + createTestIndex(8, 64); + insertVectors(); + manager_->rebuildIndexAsync(INDEX_ID, 16, 128); + ASSERT_TRUE(waitForRebuild()); + auto meta = manager_->getMetadata(INDEX_ID); + ASSERT_TRUE(meta.has_value()); + EXPECT_EQ(meta->M, 16u); + EXPECT_EQ(meta->ef_con, 128u); +} + +TEST_F(RebuildIntegrationTest, RebuildCompletes_VectorCountPreserved) { + createTestIndex(); + insertVectors(N_VECTORS); + size_t before = manager_->getElementCount(INDEX_ID); + manager_->rebuildIndexAsync(INDEX_ID, 16, 128); + ASSERT_TRUE(waitForRebuild()); + size_t after = manager_->getElementCount(INDEX_ID); + EXPECT_EQ(before, after); +} + +TEST_F(RebuildIntegrationTest, RebuildWhileInProgress_Returns409Code) { + createTestIndex(); + insertVectors(); + // setActiveRebuild is synchronous — second call sees IN_PROGRESS before thread starts + auto r1 = manager_->rebuildIndexAsync(INDEX_ID, 16, 128); + ASSERT_EQ(r1.code, 0); + auto r2 = manager_->rebuildIndexAsync(INDEX_ID, 32, 256); + EXPECT_EQ(r2.code, 2); + waitForRebuild(); +} + +TEST_F(RebuildIntegrationTest, RebuildNonExistentIndex_Returns404Code) { + auto result = manager_->rebuildIndexAsync("testuser/doesnotexist", 16, 128); + EXPECT_EQ(result.code, 1); +} + +TEST_F(RebuildIntegrationTest, RebuildNoChange_Returns400Code) { + createTestIndex(8, 64); + insertVectors(); + auto result = manager_->rebuildIndexAsync(INDEX_ID, 8, 64); + EXPECT_EQ(result.code, 3); +}