diff --git a/.github/workflows/01-ci-pipeline.yml b/.github/workflows/01-ci-pipeline.yml index 524762f6..4259d1ac 100644 --- a/.github/workflows/01-ci-pipeline.yml +++ b/.github/workflows/01-ci-pipeline.yml @@ -92,6 +92,14 @@ jobs: os: ubuntu-24.04 compiler: clang + build-and-test-linux-riscv64: + name: Build & Test (linux-riscv64) + needs: lint + uses: ./.github/workflows/07-linux-riscv-build.yml + with: + platform: linux-riscv64 + os: ubuntu-24.04-riscv + build-android: name: Build & Test (android) needs: [lint, clang-tidy] @@ -106,3 +114,8 @@ jobs: name: Build & Test (iOS) needs: [lint, clang-tidy] uses: ./.github/workflows/06-ios-build.yml + + build-and-test-linux-riscv64-rvv: + name: Build & Test (linux-riscv64-rvv) + needs: lint + uses: ./.github/workflows/08-linux-riscv-rvv-cloud-v-build.yml diff --git a/.github/workflows/03-macos-linux-build.yml b/.github/workflows/03-macos-linux-build.yml index 1dd1e45c..7543c911 100644 --- a/.github/workflows/03-macos-linux-build.yml +++ b/.github/workflows/03-macos-linux-build.yml @@ -21,7 +21,6 @@ permissions: contents: read jobs: - # Build and test matrix (parallel execution) build-and-test: name: Build & Test (${{ inputs.platform }}) runs-on: ${{ inputs.os }} @@ -32,7 +31,7 @@ jobs: include: - os: ${{ inputs.os }} platform: ${{ inputs.platform }} - arch_flag: "" # Use appropriate architecture + arch_flag: "" steps: - name: Checkout code @@ -61,7 +60,6 @@ jobs: - name: Set up environment variables run: | - # Set number of processors for parallel builds if [[ "${{ matrix.platform }}" == "macos-arm64" ]]; then NPROC=$(sysctl -n hw.ncpu 2>/dev/null || echo 2) else @@ -70,19 +68,18 @@ jobs: echo "NPROC=$NPROC" >> $GITHUB_ENV echo "Using $NPROC parallel jobs for builds" - # Set compiler when clang is requested if [[ "${{ inputs.compiler }}" == "clang" ]]; then echo "CC=clang" >> $GITHUB_ENV echo "CXX=clang++" >> $GITHUB_ENV fi - # Add Python user base bin to PATH for pip-installed CLI tools echo "$(python -c 'import site; print(site.USER_BASE)')/bin" >> $GITHUB_PATH shell: bash - name: Install dependencies run: | - python -m pip install --upgrade pip \ + python -m pip install --upgrade pip + python -m pip install \ pybind11==3.0 \ cmake==3.30.0 \ ninja==1.11.1 \ @@ -138,4 +135,4 @@ jobs: ./c_api_field_schema_example ./c_api_index_example ./c_api_optimized_example - shell: bash \ No newline at end of file + shell: bash diff --git a/.github/workflows/07-linux-riscv-build.yml b/.github/workflows/07-linux-riscv-build.yml new file mode 100644 index 00000000..4f1f4c66 --- /dev/null +++ b/.github/workflows/07-linux-riscv-build.yml @@ -0,0 +1,257 @@ +name: Linux RISC-V Build + +on: + workflow_call: + +permissions: + contents: read + +env: + RISE_PYPI: https://gitlab.com/api/v4/projects/56254198/packages/pypi/simple + PIP_BREAK_SYSTEM_PACKAGES: 1 + +jobs: + build: + name: Build (linux-riscv64) + runs-on: ubuntu-24.04-riscv + + steps: + - name: Checkout code + uses: actions/checkout@v6 + with: + submodules: recursive + + - name: Install build dependencies + run: | + sudo mkdir -p /var/lib/dpkg/updates + sudo mkdir -p /var/lib/apt/lists/ + sudo mkdir -p /var/cache/apt/archives/ + sudo touch /var/lib/dpkg/status + sudo apt-get purge -y byobu || true + sudo apt-get update -o Dpkg::Lock::Timeout=300 + sudo DEBIAN_FRONTEND=noninteractive apt-get install -yq \ + -o Dpkg::Options::="--force-confdef" \ + -o Dpkg::Options::="--force-confold" \ + -o Dpkg::Lock::Timeout=300 \ + python3-pybind11 pybind11-dev + shell: bash + + - name: Build from source + run: | + cd "$GITHUB_WORKSPACE" + NPROC=$(nproc 2>/dev/null || echo 2) + echo "Using $NPROC parallel jobs for builds" + cmake -S . -B build \ + -G "Unix Makefiles" \ + -DCMAKE_BUILD_TYPE=Release \ + -DBUILD_TOOLS=ON \ + -DBUILD_PYTHON_BINDINGS=ON + make -C build -j"$NPROC" + shell: bash + + - name: Archive entire workspace + run: | + cd "$GITHUB_WORKSPACE" + tar -cf linux-riscv64-workspace.tar . + shell: bash + + - name: Upload workspace artifacts + uses: actions/upload-artifact@v7 + with: + name: linux-riscv64-workspace + path: ${{ github.workspace }}/linux-riscv64-workspace.tar + if-no-files-found: error + + cpp-tests: + name: C++ Tests + runs-on: ubuntu-24.04-riscv + needs: build + + steps: + - name: Download workspace artifacts + uses: actions/download-artifact@v8 + with: + name: linux-riscv64-workspace + path: ${{ github.workspace }} + + - name: Extract workspace + run: | + cd "$GITHUB_WORKSPACE" + tar -xf linux-riscv64-workspace.tar + shell: bash + + - name: Install test dependencies + run: | + sudo mkdir -p /var/lib/dpkg/updates + sudo mkdir -p /var/lib/apt/lists/ + sudo mkdir -p /var/cache/apt/archives/ + sudo touch /var/lib/dpkg/status + sudo apt-get purge -y byobu || true + sudo apt-get update -o Dpkg::Lock::Timeout=300 + sudo DEBIAN_FRONTEND=noninteractive apt-get install -yq \ + -o Dpkg::Options::="--force-confdef" \ + -o Dpkg::Options::="--force-confold" \ + -o Dpkg::Lock::Timeout=300 \ + python3-pybind11 pybind11-dev libgtest-dev liburing-dev + shell: bash + + - name: Reconfigure build directory + run: | + cd "$GITHUB_WORKSPACE" + cmake -S . -B build \ + -G "Unix Makefiles" \ + -DCMAKE_BUILD_TYPE=Release \ + -DBUILD_TOOLS=ON \ + -DBUILD_PYTHON_BINDINGS=ON + shell: bash + + - name: Run C++ Tests + run: | + cd "$GITHUB_WORKSPACE/build" + NPROC=$(nproc 2>/dev/null || echo 2) + make unittest -j"$NPROC" + shell: bash + + python-tests: + name: Python Tests + runs-on: ubuntu-24.04-riscv + needs: build + + steps: + - name: Select Python + run: | + if command -v python3 >/dev/null 2>&1; then + PYTHON_BIN=python3 + elif command -v python >/dev/null 2>&1; then + PYTHON_BIN=python + else + echo "No local Python interpreter found on PATH" + exit 1 + fi + "$PYTHON_BIN" --version + echo "PYTHON=$PYTHON_BIN" >> "$GITHUB_ENV" + shell: bash + + - name: Download workspace artifacts + uses: actions/download-artifact@v8 + with: + name: linux-riscv64-workspace + path: ${{ github.workspace }} + + - name: Extract workspace + run: | + cd "$GITHUB_WORKSPACE" + tar -xf linux-riscv64-workspace.tar + echo "$($PYTHON -c 'import site; print(site.USER_BASE)')/bin" >> "$GITHUB_PATH" + shell: bash + + - name: Install dependencies + run: | + sudo mkdir -p /var/lib/dpkg/updates + sudo mkdir -p /var/lib/apt/lists/ + sudo mkdir -p /var/cache/apt/archives/ + sudo touch /var/lib/dpkg/status + sudo apt-get purge -y byobu || true + sudo apt-get update -o Dpkg::Lock::Timeout=300 + sudo DEBIAN_FRONTEND=noninteractive apt-get install -yq \ + -o Dpkg::Options::="--force-confdef" \ + -o Dpkg::Options::="--force-confold" \ + -o Dpkg::Lock::Timeout=300 \ + libgtest-dev liburing-dev + + $PYTHON -m pip install --upgrade pip + $PYTHON -m pip install numpy==2.2.2 cmake==3.30.0 ninja==1.11.1.1 --index-url "$RISE_PYPI" + $PYTHON -m pip install pybind11==3.0 pytest scikit-build-core setuptools_scm + shell: bash + + - name: Reconfigure build directory + run: | + cd "$GITHUB_WORKSPACE" + cmake -S . -B build \ + -G "Unix Makefiles" \ + -DCMAKE_BUILD_TYPE=Release \ + -DBUILD_TOOLS=ON \ + -DBUILD_PYTHON_BINDINGS=ON \ + -Dpybind11_DIR="$($PYTHON -c 'import pybind11; print(pybind11.get_cmake_dir())')" + shell: bash + + - name: Install from existing build directory + run: | + cd "$GITHUB_WORKSPACE" + NPROC=$(nproc 2>/dev/null || echo 2) + export SKBUILD_BUILD_DIR="$GITHUB_WORKSPACE/build" + CMAKE_GENERATOR="Unix Makefiles" \ + CMAKE_BUILD_PARALLEL_LEVEL="$NPROC" \ + $PYTHON -m pip install -v . \ + --no-build-isolation \ + --config-settings='cmake.define.BUILD_TOOLS="ON"' + shell: bash + + - name: Run Python Tests + run: | + cd "$GITHUB_WORKSPACE" + $PYTHON -m pytest python/tests/ + shell: bash + + cpp-examples: + name: C++ Examples + runs-on: ubuntu-24.04-riscv + needs: build + + steps: + - name: Download workspace artifacts + uses: actions/download-artifact@v8 + with: + name: linux-riscv64-workspace + path: ${{ github.workspace }} + + - name: Extract workspace + run: | + cd "$GITHUB_WORKSPACE" + tar -xf linux-riscv64-workspace.tar + shell: bash + + - name: Run C++ Examples + run: | + cd "$GITHUB_WORKSPACE/examples/c++" + NPROC=$(nproc 2>/dev/null || echo 2) + mkdir -p build && cd build + cmake .. -DCMAKE_BUILD_TYPE=Release + make -j "$NPROC" + ./db-example + ./core-example + ./ailego-example + shell: bash + + c-examples: + name: C Examples + runs-on: ubuntu-24.04-riscv + needs: build + + steps: + - name: Download workspace artifacts + uses: actions/download-artifact@v8 + with: + name: linux-riscv64-workspace + path: ${{ github.workspace }} + + - name: Extract workspace + run: | + cd "$GITHUB_WORKSPACE" + tar -xf linux-riscv64-workspace.tar + shell: bash + + - name: Run C Examples + run: | + cd "$GITHUB_WORKSPACE/examples/c" + NPROC=$(nproc 2>/dev/null || echo 2) + mkdir -p build && cd build + cmake .. -DCMAKE_BUILD_TYPE=Release + make -j "$NPROC" + ./c_api_basic_example + ./c_api_collection_schema_example + ./c_api_doc_example + ./c_api_field_schema_example + ./c_api_index_example + ./c_api_optimized_example + shell: bash \ No newline at end of file diff --git a/.github/workflows/08-linux-riscv-rvv-cloud-v-build.yml b/.github/workflows/08-linux-riscv-rvv-cloud-v-build.yml new file mode 100644 index 00000000..73a482c7 --- /dev/null +++ b/.github/workflows/08-linux-riscv-rvv-cloud-v-build.yml @@ -0,0 +1,207 @@ +name: Linux RISC-V RVV Cloud-V Build + +on: + workflow_call: + +permissions: + contents: read + +env: + RISE_PYPI: https://gitlab.com/api/v4/projects/56254198/packages/pypi/simple + PIP_BREAK_SYSTEM_PACKAGES: 1 + WORKSPACE_ARTIFACT: linux-riscv64-rvv-banana-pi-f3-workspace + RISCV_CMAKE_ARGS: >- + -DENABLE_RISCV64=ON + -DENABLE_RISCV_VECTOR=ON + -DENABLE_RISCV_ZVFH=ON + -DENABLE_RISCV_ZIHINTPAUSE=ON + +jobs: + build: + name: Build (linux-riscv64-rvv) + runs-on: banana-pi-f3 + + steps: + - name: Checkout code + uses: actions/checkout@v6 + with: + submodules: recursive + + - name: Install build dependencies + run: | + sudo mkdir -p /var/lib/dpkg/updates + sudo mkdir -p /var/lib/apt/lists/ + sudo mkdir -p /var/cache/apt/archives/ + sudo touch /var/lib/dpkg/status + sudo apt-get purge -y byobu || true + sudo apt-get update -o Dpkg::Lock::Timeout=300 + sudo DEBIAN_FRONTEND=noninteractive apt-get install -yq \ + -o Dpkg::Options::="--force-confdef" \ + -o Dpkg::Options::="--force-confold" \ + -o Dpkg::Lock::Timeout=300 \ + python3-pybind11 pybind11-dev + shell: bash + + - name: Build from source + run: | + cd "$GITHUB_WORKSPACE" + NPROC=$(nproc 2>/dev/null || echo 2) + echo "Using $NPROC parallel jobs for builds" + cmake -S . -B build \ + -G "Unix Makefiles" \ + -DCMAKE_BUILD_TYPE=Release \ + -DBUILD_TOOLS=ON \ + -DBUILD_PYTHON_BINDINGS=ON \ + $RISCV_CMAKE_ARGS + make -C build -j"$NPROC" + shell: bash + + - name: Archive entire workspace + run: | + cd "$GITHUB_WORKSPACE" + tar -cf "${WORKSPACE_ARTIFACT}.tar" . + shell: bash + + - name: Upload workspace artifacts + uses: actions/upload-artifact@v7 + with: + name: ${{ env.WORKSPACE_ARTIFACT }} + path: ${{ github.workspace }}/${{ env.WORKSPACE_ARTIFACT }}.tar + if-no-files-found: error + + cpp-tests: + name: C++ Tests (linux-riscv64-rvv) + runs-on: banana-pi-f3 + needs: build + + steps: + - name: Download workspace artifacts + uses: actions/download-artifact@v8 + with: + name: ${{ env.WORKSPACE_ARTIFACT }} + path: ${{ github.workspace }} + + - name: Extract workspace + run: | + cd "$GITHUB_WORKSPACE" + tar -xf "${WORKSPACE_ARTIFACT}.tar" + shell: bash + + - name: Install test dependencies + run: | + sudo mkdir -p /var/lib/dpkg/updates + sudo mkdir -p /var/lib/apt/lists/ + sudo mkdir -p /var/cache/apt/archives/ + sudo touch /var/lib/dpkg/status + sudo apt-get purge -y byobu || true + sudo apt-get update -o Dpkg::Lock::Timeout=300 + sudo DEBIAN_FRONTEND=noninteractive apt-get install -yq \ + -o Dpkg::Options::="--force-confdef" \ + -o Dpkg::Options::="--force-confold" \ + -o Dpkg::Lock::Timeout=300 \ + python3-pybind11 pybind11-dev libgtest-dev liburing-dev + shell: bash + + - name: Reconfigure build directory + run: | + cd "$GITHUB_WORKSPACE" + cmake -S . -B build \ + -G "Unix Makefiles" \ + -DCMAKE_BUILD_TYPE=Release \ + -DBUILD_TOOLS=ON \ + -DBUILD_PYTHON_BINDINGS=ON \ + $RISCV_CMAKE_ARGS + shell: bash + + - name: Run C++ Tests + run: | + cd "$GITHUB_WORKSPACE/build" + NPROC=$(nproc 2>/dev/null || echo 2) + make unittest -j"$NPROC" + shell: bash + + python-tests: + name: Python Tests (linux-riscv64-rvv) + runs-on: banana-pi-f3 + needs: build + + steps: + - name: Select Python + run: | + if command -v python3 >/dev/null 2>&1; then + PYTHON_BIN=python3 + elif command -v python >/dev/null 2>&1; then + PYTHON_BIN=python + else + echo "No local Python interpreter found on PATH" + exit 1 + fi + "$PYTHON_BIN" --version + echo "PYTHON=$PYTHON_BIN" >> "$GITHUB_ENV" + shell: bash + + - name: Download workspace artifacts + uses: actions/download-artifact@v8 + with: + name: ${{ env.WORKSPACE_ARTIFACT }} + path: ${{ github.workspace }} + + - name: Extract workspace + run: | + cd "$GITHUB_WORKSPACE" + tar -xf "${WORKSPACE_ARTIFACT}.tar" + echo "$($PYTHON -c 'import site; print(site.USER_BASE)')/bin" >> "$GITHUB_PATH" + shell: bash + + - name: Install dependencies + run: | + sudo mkdir -p /var/lib/dpkg/updates + sudo mkdir -p /var/lib/apt/lists/ + sudo mkdir -p /var/cache/apt/archives/ + sudo touch /var/lib/dpkg/status + sudo apt-get purge -y byobu || true + sudo apt-get update -o Dpkg::Lock::Timeout=300 + sudo DEBIAN_FRONTEND=noninteractive apt-get install -yq \ + -o Dpkg::Options::="--force-confdef" \ + -o Dpkg::Options::="--force-confold" \ + -o Dpkg::Lock::Timeout=300 \ + libgtest-dev liburing-dev + + $PYTHON -m pip install --upgrade pip + $PYTHON -m pip install numpy==2.2.2 cmake==3.30.0 ninja==1.11.1.1 --index-url "$RISE_PYPI" + $PYTHON -m pip install pybind11==3.0 pytest scikit-build-core setuptools_scm + shell: bash + + - name: Reconfigure build directory + run: | + cd "$GITHUB_WORKSPACE" + cmake -S . -B build \ + -G "Unix Makefiles" \ + -DCMAKE_BUILD_TYPE=Release \ + -DBUILD_TOOLS=ON \ + -DBUILD_PYTHON_BINDINGS=ON \ + $RISCV_CMAKE_ARGS \ + -Dpybind11_DIR="$($PYTHON -c 'import pybind11; print(pybind11.get_cmake_dir())')" + shell: bash + + - name: Install from existing build directory + run: | + cd "$GITHUB_WORKSPACE" + NPROC=$(nproc 2>/dev/null || echo 2) + export SKBUILD_BUILD_DIR="$GITHUB_WORKSPACE/build" + CMAKE_GENERATOR="Unix Makefiles" \ + CMAKE_BUILD_PARALLEL_LEVEL="$NPROC" \ + $PYTHON -m pip install -v . \ + --no-build-isolation \ + --config-settings='cmake.define.BUILD_TOOLS="ON"' \ + --config-settings='cmake.define.ENABLE_RISCV64="ON"' \ + --config-settings='cmake.define.ENABLE_RISCV_VECTOR="ON"' \ + --config-settings='cmake.define.ENABLE_RISCV_ZVFH="ON"' \ + --config-settings='cmake.define.ENABLE_RISCV_ZIHINTPAUSE="ON"' + shell: bash + + - name: Run Python Tests + run: | + cd "$GITHUB_WORKSPACE" + $PYTHON -m pytest python/tests/ + shell: bash diff --git a/src/ailego/buffer/block_eviction_queue.cc b/src/ailego/buffer/block_eviction_queue.cc new file mode 100644 index 00000000..6f24b353 --- /dev/null +++ b/src/ailego/buffer/block_eviction_queue.cc @@ -0,0 +1,153 @@ +// Copyright 2025-present the zvec project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +namespace zvec { +namespace ailego { + +int BlockEvictionQueue::init() { + evict_batch_size_ = 512; + for (size_t i = 0; i < CACHE_QUEUE_NUM; i++) { + evict_queues_.push_back(ConcurrentQueue(evict_batch_size_ * 200)); + } + return 0; +} + +bool BlockEvictionQueue::evict_single_block(BlockType &item) { + bool found = false; + for (size_t i = 0; i < CACHE_QUEUE_NUM; i++) { + found = evict_queues_[i].try_dequeue(item); + if (found) { + break; + } + } + return found; +} + +bool BlockEvictionQueue::is_valid_and_alive(const BlockType &item) { + std::shared_lock lock(valid_page_tables_mutex_); + if (valid_page_tables_.find(item.page_table) == valid_page_tables_.end()) { + return false; + } + // is_dead_block accesses entries_ under the same shared lock, so the + // VectorPageTable destructor (which holds the unique lock via set_invalid) + // cannot free entries_ while this check is in progress. + return !item.page_table->is_dead_block(item); +} + +bool BlockEvictionQueue::evict_block(BlockType &item) { + bool ok = false; + do { + ok = evict_single_block(item); + if (!ok) { + return false; + } + if (item.page_table == nullptr) { + if (!ParquetBufferPool::get_instance().is_dead_node(item)) { + break; + } else { + continue; + } + } + } while (!is_valid_and_alive(item)); + return ok; +} + +void BlockEvictionQueue::recycle() { + BlockType item; + while (MemoryLimitPool::get_instance().is_full() && evict_block(item)) { + if (item.page_table) { + std::shared_lock lock(valid_page_tables_mutex_); + if (valid_page_tables_.find(item.page_table) != + valid_page_tables_.end()) { + item.page_table->evict_block(item.vector_block.first); + } + } else { + ParquetBufferPool::get_instance().evict(item.parquet_buffer_block.first); + } + } +} + +bool BlockEvictionQueue::add_single_block(const BlockType &block, + int queue_index) { + bool ok = evict_queues_[queue_index].enqueue(block); + if (!ok) { + LOG_ERROR("enqueue failed."); + return false; + } + return true; +} + +int MemoryLimitPool::init(size_t pool_size) { + pool_size_ = 0; + BlockEvictionQueue::get_instance().recycle(); + pool_size_ = pool_size; + LOG_INFO("MemoryLimitPool initialized with pool size: %lu", pool_size_); + return 0; +} + +bool MemoryLimitPool::try_acquire_buffer(const size_t buffer_size, + char *&buffer) { + size_t expected, desired; + do { + expected = used_size_.load(); + if (expected >= pool_size_) { + return false; + } + desired = expected + buffer_size; + } while (!used_size_.compare_exchange_weak(expected, desired)); + buffer = (char *)ailego_aligned_malloc(buffer_size, 4096); + if (!buffer) { + used_size_.fetch_sub(buffer_size); + return false; + } + return true; +} + +void MemoryLimitPool::acquire_parquet(const size_t buffer_size) { + size_t expected, desired; + do { + expected = used_size_.load(); + desired = expected + buffer_size; + } while (!used_size_.compare_exchange_weak(expected, desired)); +} + +void MemoryLimitPool::release_buffer(char *buffer, const size_t buffer_size) { + size_t expected, desired; + do { + expected = used_size_.load(); + desired = expected - buffer_size; + assert(expected >= buffer_size); + } while (!used_size_.compare_exchange_weak(expected, desired)); + ailego_free(buffer); +} + +void MemoryLimitPool::release_parquet(const size_t buffer_size) { + size_t expected, desired; + do { + expected = used_size_.load(); + desired = expected - buffer_size; + assert(expected >= buffer_size); + } while (!used_size_.compare_exchange_weak(expected, desired)); +} + +bool MemoryLimitPool::is_full() { + return used_size_.load() >= pool_size_; +} + +} // namespace ailego +} // namespace zvec \ No newline at end of file diff --git a/src/ailego/buffer/buffer_pool.cc b/src/ailego/buffer/buffer_pool.cc deleted file mode 100644 index 38f73f62..00000000 --- a/src/ailego/buffer/buffer_pool.cc +++ /dev/null @@ -1,328 +0,0 @@ -#include -#include - -#if defined(_MSC_VER) -#ifndef NOMINMAX -#define NOMINMAX -#endif -#include -static ssize_t zvec_pread(int fd, void *buf, size_t count, size_t offset) { - HANDLE handle = reinterpret_cast(_get_osfhandle(fd)); - if (handle == INVALID_HANDLE_VALUE) return -1; - OVERLAPPED ov = {}; - ov.Offset = static_cast(offset & 0xFFFFFFFF); - ov.OffsetHigh = static_cast(offset >> 32); - DWORD bytes_read = 0; - if (!ReadFile(handle, buf, static_cast(count), &bytes_read, &ov)) { - return -1; - } - return static_cast(bytes_read); -} -#endif - -namespace zvec { -namespace ailego { - -int LRUCache::init(size_t block_size) { - block_size_ = block_size; - for (size_t i = 0; i < CATCH_QUEUE_NUM; i++) { - queues_.push_back(ConcurrentQueue(block_size)); - } - return 0; -} - -bool LRUCache::evict_single_block(BlockType &item) { - bool found = false; - for (size_t i = 0; i < CATCH_QUEUE_NUM; i++) { - found = queues_[i].try_dequeue(item); - if (found) { - break; - } - } - return found; -} - -bool LRUCache::add_single_block(const LPMap *lp_map, const BlockType &block, - int block_type) { - bool ok = queues_[block_type].enqueue(block); - if (!ok) { - LOG_ERROR("enqueue failed."); - return false; - } - evict_queue_insertions_.fetch_add(1, std::memory_order_relaxed); - if (evict_queue_insertions_ % block_size_ == 0) { - this->clear_dead_node(lp_map); - } - return true; -} - -void LRUCache::clear_dead_node(const LPMap *lp_map) { - for (size_t i = 0; i < CATCH_QUEUE_NUM; i++) { - size_t clear_size = block_size_ * 2; - if (queues_[i].size_approx() < clear_size * 4) { - continue; - } - size_t clear_count = 0; - ConcurrentQueue tmp(block_size_); - BlockType item; - while (queues_[i].try_dequeue(item) && (clear_count++ < clear_size)) { - if (!lp_map->isDeadBlock(item)) { - if (!tmp.enqueue(item)) { - LOG_ERROR("enqueue failed."); - } - } - } - while (tmp.try_dequeue(item)) { - if (!lp_map->isDeadBlock(item)) { - if (!queues_[i].enqueue(item)) { - LOG_ERROR("enqueue failed."); - } - } - } - } -} - -void LPMap::init(size_t entry_num) { - if (entries_) { - delete[] entries_; - } - entry_num_ = entry_num; - entries_ = new Entry[entry_num_]; - for (size_t i = 0; i < entry_num_; i++) { - entries_[i].ref_count.store(std::numeric_limits::min()); - entries_[i].load_count.store(0); - entries_[i].buffer = nullptr; - } - cache_.init(entry_num * 4); -} - -char *LPMap::acquire_block(block_id_t block_id, bool lru_mode) { - assert(block_id < entry_num_); - Entry &entry = entries_[block_id]; - if (!lru_mode) { - return entry.buffer; - } - while (true) { - int current_count = entry.ref_count.load(std::memory_order_acquire); - if (current_count < 0) { - return nullptr; - } - if (entry.ref_count.compare_exchange_weak(current_count, current_count + 1, - std::memory_order_acq_rel, - std::memory_order_acquire)) { - if (current_count == 0) { - entry.load_count.fetch_add(1, std::memory_order_relaxed); - } - return entry.buffer; - } - } -} - -void LPMap::release_block(block_id_t block_id) { - assert(block_id < entry_num_); - Entry &entry = entries_[block_id]; - - if (entry.ref_count.fetch_sub(1, std::memory_order_release) == 1) { - std::atomic_thread_fence(std::memory_order_acquire); - LRUCache::BlockType block; - block.first = block_id; - block.second = entry.load_count.load(); - cache_.add_single_block(this, block, 0); - } -} - -char *LPMap::evict_block(block_id_t block_id) { - assert(block_id < entry_num_); - Entry &entry = entries_[block_id]; - int expected = 0; - if (entry.ref_count.compare_exchange_strong( - expected, std::numeric_limits::min())) { - char *buffer = entry.buffer; - entry.buffer = nullptr; - return buffer; - } else { - return nullptr; - } -} - -char *LPMap::set_block_acquired(block_id_t block_id, char *buffer) { - assert(block_id < entry_num_); - Entry &entry = entries_[block_id]; - while (true) { - int current_count = entry.ref_count.load(std::memory_order_relaxed); - if (current_count >= 0) { - if (entry.ref_count.compare_exchange_weak( - current_count, current_count + 1, std::memory_order_acq_rel, - std::memory_order_acquire)) { - return entry.buffer; - } - } else { - if (entry.ref_count.compare_exchange_weak(current_count, 1, - std::memory_order_acq_rel, - std::memory_order_acquire)) { - entry.buffer = buffer; - entry.load_count.fetch_add(1, std::memory_order_relaxed); - return entry.buffer; - } - } - } -} - -void LPMap::recycle(moodycamel::ConcurrentQueue &free_buffers) { - LRUCache::BlockType block; - do { - bool ok = cache_.evict_single_block(block); - if (!ok) { - return; - } - } while (isDeadBlock(block)); - char *buffer = evict_block(block.first); - if (buffer) { - if (!free_buffers.enqueue(buffer)) { - LOG_ERROR("recycle buffer enqueue failed."); - ailego_free(buffer); - } - } -} - -VecBufferPool::VecBufferPool(const std::string &filename) { -#if defined(_MSC_VER) - fd_ = _open(filename.c_str(), O_RDONLY | _O_BINARY); -#else - fd_ = open(filename.c_str(), O_RDONLY); -#endif - if (fd_ < 0) { - throw std::runtime_error("Failed to open file: " + filename); - } -#if defined(_MSC_VER) - struct _stat64 st; - if (_fstat64(fd_, &st) < 0) { - _close(fd_); -#else - struct stat st; - if (fstat(fd_, &st) < 0) { - ::close(fd_); -#endif - throw std::runtime_error("Failed to stat file: " + filename); - } - file_size_ = st.st_size; -} - -int VecBufferPool::init(size_t pool_capacity, size_t block_size, - size_t segment_count) { - if (block_size == 0) { - LOG_ERROR("block_size must not be 0"); - return -1; - } - pool_capacity_ = pool_capacity; - size_t buffer_num = pool_capacity_ / block_size + 10; - size_t block_num = segment_count + 10; - lp_map_.init(block_num); - mutex_vec_.reserve(block_num); - for (int i = 0; i < block_num; i++) { - mutex_vec_.emplace_back(std::make_unique()); - } - for (size_t i = 0; i < buffer_num; i++) { - char *buffer = (char *)ailego_malloc(block_size); - if (buffer != nullptr) { - if (!free_buffers_.enqueue(buffer)) { - LOG_ERROR("recycle buffer enqueue failed."); - ailego_free(buffer); - return -1; - } - } else { - LOG_ERROR("aligned_alloc %zu(size: %zu) failed", i, block_size); - return -1; - } - } - LOG_DEBUG("Buffer pool num: %zu, entry num: %zu", buffer_num, - lp_map_.entry_num()); - no_lru_mode_ = false; - if (lp_map_.entry_num() <= buffer_num) { - no_lru_mode_ = true; - } - return 0; -} - -VecBufferPoolHandle VecBufferPool::get_handle() { - return VecBufferPoolHandle(*this); -} - -char *VecBufferPool::acquire_buffer(block_id_t block_id, size_t offset, - size_t size, int retry) { - char *buffer = lp_map_.acquire_block(block_id, !no_lru_mode()); - if (buffer) { - return buffer; - } - std::lock_guard lock(*mutex_vec_[block_id]); - buffer = lp_map_.acquire_block(block_id, !no_lru_mode()); - if (buffer) { - return buffer; - } - { - bool found = free_buffers_.try_dequeue(buffer); - if (!found && !no_lru_mode_) { - for (int i = 0; i < retry; i++) { - lp_map_.recycle(free_buffers_); - found = free_buffers_.try_dequeue(buffer); - if (found) { - break; - } - } - } - if (!found) { - LOG_ERROR("Buffer pool failed to get free buffer"); - return nullptr; - } - } - -#if defined(_MSC_VER) - ssize_t read_bytes = zvec_pread(fd_, buffer, size, offset); -#else - ssize_t read_bytes = pread(fd_, buffer, size, offset); -#endif - if (read_bytes != static_cast(size)) { - LOG_ERROR("Buffer pool failed to read file at offset: %zu", offset); - free_buffers_.enqueue(buffer); - return nullptr; - } - return lp_map_.set_block_acquired(block_id, buffer); -} - -int VecBufferPool::get_meta(size_t offset, size_t length, char *buffer) { -#if defined(_MSC_VER) - ssize_t read_bytes = zvec_pread(fd_, buffer, length, offset); -#else - ssize_t read_bytes = pread(fd_, buffer, length, offset); -#endif - if (read_bytes != static_cast(length)) { - LOG_ERROR("Buffer pool failed to read file at offset: %zu", offset); - return -1; - } - return 0; -} - -char *VecBufferPoolHandle::get_block(size_t offset, size_t size, - size_t block_id) { - char *buffer = pool_.acquire_buffer(block_id, offset, size, 5); - return buffer; -} - -int VecBufferPoolHandle::get_meta(size_t offset, size_t length, char *buffer) { - return pool_.get_meta(offset, length, buffer); -} - -void VecBufferPoolHandle::release_one(block_id_t block_id) { - if (!pool_.no_lru_mode()) { - pool_.lp_map_.release_block(block_id); - } -} - -void VecBufferPoolHandle::acquire_one(block_id_t block_id) { - if (!pool_.no_lru_mode()) { - pool_.lp_map_.acquire_block(block_id, true); - } -} - -} // namespace ailego -} // namespace zvec \ No newline at end of file diff --git a/src/ailego/buffer/parquet_hash_table.cc b/src/ailego/buffer/parquet_hash_table.cc new file mode 100644 index 00000000..ab519843 --- /dev/null +++ b/src/ailego/buffer/parquet_hash_table.cc @@ -0,0 +1,250 @@ +// Copyright 2025-present the zvec project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace zvec { +namespace ailego { + +ParquetBufferID::ParquetBufferID(std::string &filename, int column, + int row_group) + : filename(filename), column(column), row_group(row_group) { + struct stat file_stat; + if (stat(filename.c_str(), &file_stat) == 0) { + // file_stat.st_ino contains the inode number + // file_stat.st_dev contains the device ID + // Together they uniquely identify a file + file_id = file_stat.st_ino; + std::filesystem::path p(filename); + auto ftime = std::filesystem::last_write_time(p); + mtime = static_cast(ftime.time_since_epoch().count()); + } +} + +const std::string ParquetBufferID::to_string() const { + std::string msg{"Buffer["}; + msg += "parquet: " + filename + "[" + std::to_string(file_id) + "]" + + ", column: " + std::to_string(column) + + ", row_group: " + std::to_string(row_group); + msg += ", mtime: " + std::to_string(mtime); + msg += "]"; + return msg; +} + +ParquetBufferContextHandle::ParquetBufferContextHandle( + const ParquetBufferContextHandle &handle_) + : buffer_id_(handle_.buffer_id_), arrow_(handle_.arrow_) { + if (arrow_) { + ParquetBufferPool::get_instance().acquire_locked(buffer_id_); + } +} + +ParquetBufferContextHandle::~ParquetBufferContextHandle() { + if (arrow_) { + ParquetBufferPool::get_instance().release(buffer_id_); + } +} + +arrow::Status ParquetBufferPool::acquire(ParquetBufferID buffer_id, + ParquetBufferContext &context) { + // TODO: file handler and memory pool can be optimized + arrow::MemoryPool *mem_pool = arrow::default_memory_pool(); + + // Open file + std::shared_ptr input; + const auto &file_name = buffer_id.filename; + ARROW_ASSIGN_OR_RAISE(input, arrow::io::ReadableFile::Open(file_name)); + + // Open reader + std::unique_ptr reader; + ARROW_ASSIGN_OR_RAISE(reader, parquet::arrow::OpenFile(input, mem_pool)); + + // Perform read + int row_group = buffer_id.row_group; + int column = buffer_id.column; + auto s = reader->RowGroup(row_group)->Column(column)->Read(&context.arrow); + if (!s.ok()) { + LOG_ERROR("Failed to read parquet file[%s]", file_name.c_str()); + context.arrow = nullptr; + return s; + } + + context.size = 0; + context.arrow_refs.clear(); + // Compute the memory usage and hijack Arrow's buffers with our + // implementation + for (auto &array : context.arrow->chunks()) { + auto &buffers = array->data()->buffers; + for (size_t buf_idx = 0; buf_idx < buffers.size(); ++buf_idx) { + if (buffers[buf_idx] == nullptr) { + continue; + } + // Keep references to original buffers to prevent premature deletion + context.arrow_refs.emplace_back(buffers[buf_idx]); + context.size += buffers[buf_idx]->capacity(); + // Create hijacked buffer with custom deleter that notifies us when + // Arrow is finished with the buffer + std::shared_ptr hijacked_buffer( + buffers[buf_idx].get(), ArrowBufferDeleter(this, buffer_id)); + buffers[buf_idx] = hijacked_buffer; + } + } + + return arrow::Status::OK(); +} + +ParquetBufferContextHandle ParquetBufferPool::acquire_buffer( + ParquetBufferID buffer_id) { + std::shared_ptr arrow{nullptr}; + { + std::shared_lock lock(table_mutex_); + auto iter = table_.find(buffer_id); + if (iter != table_.end()) { + arrow = acquire(buffer_id); + if (arrow != nullptr) { + return ParquetBufferContextHandle(buffer_id, arrow); + } + } + } + { + bool found = !MemoryLimitPool::get_instance().is_full(); + if (!found) { + for (int i = 0; i < 5; i++) { + BlockEvictionQueue::get_instance().recycle(); + found = !MemoryLimitPool::get_instance().is_full(); + if (found) { + break; + } + } + } + if (!found) { + LOG_ERROR("Failed to acquire parquet buffer: %s", + buffer_id.to_string().c_str()); + return ParquetBufferContextHandle(); + } + std::unique_lock lock(table_mutex_); + if (acquire(buffer_id, table_[buffer_id]).ok()) { + MemoryLimitPool::get_instance().acquire_parquet(table_[buffer_id].size); + arrow = set_block_acquired(buffer_id); + return ParquetBufferContextHandle(buffer_id, arrow); + } else { + LOG_ERROR("Failed to acquire parquet buffer: %s", + buffer_id.to_string().c_str()); + return ParquetBufferContextHandle(); + } + } +} + +std::shared_ptr ParquetBufferPool::set_block_acquired( + ParquetBufferID buffer_id) { + ParquetBufferContext &context = table_[buffer_id]; + while (true) { + int current_count = context.ref_count.load(std::memory_order_relaxed); + if (current_count >= 0) { + if (context.ref_count.compare_exchange_weak( + current_count, current_count + 1, std::memory_order_acq_rel, + std::memory_order_acquire)) { + return context.arrow; + } + } else { + if (context.ref_count.compare_exchange_weak(current_count, 1, + std::memory_order_acq_rel, + std::memory_order_acquire)) { + context.load_count.fetch_add(1, std::memory_order_relaxed); + return context.arrow; + } + } + } +} + +std::shared_ptr ParquetBufferPool::acquire( + ParquetBufferID buffer_id) { + auto iter = table_.find(buffer_id); + if (iter == table_.end()) { + return nullptr; + } + ParquetBufferContext &context = table_[buffer_id]; + while (true) { + int current_count = context.ref_count.load(std::memory_order_acquire); + if (current_count < 0) { + return nullptr; + } + if (context.ref_count.compare_exchange_weak( + current_count, current_count + 1, std::memory_order_acq_rel, + std::memory_order_acquire)) { + if (current_count == 0) { + context.load_count.fetch_add(1, std::memory_order_relaxed); + } + return context.arrow; + } + } + return nullptr; +} + +std::shared_ptr ParquetBufferPool::acquire_locked( + ParquetBufferID buffer_id) { + std::shared_lock lock(table_mutex_); + return acquire(buffer_id); +} + +void ParquetBufferPool::release(ParquetBufferID buffer_id) { + std::shared_lock lock(table_mutex_); + auto iter = table_.find(buffer_id); + if (iter == table_.end()) { + return; + } + ParquetBufferContext &context = table_[buffer_id]; + if (context.ref_count.fetch_sub(1, std::memory_order_release) == 1) { + std::atomic_thread_fence(std::memory_order_acquire); + BlockEvictionQueue::BlockType block; + block.parquet_buffer_block.first = buffer_id; + block.parquet_buffer_block.second = context.load_count.load(); + BlockEvictionQueue::get_instance().add_single_block(block, 0); + } +} + +void ParquetBufferPool::evict(ParquetBufferID buffer_id) { + std::unique_lock lock(table_mutex_); + auto iter = table_.find(buffer_id); + if (iter == table_.end()) { + return; + } + ParquetBufferContext &context = table_[buffer_id]; + int expected = 0; + if (context.ref_count.compare_exchange_strong( + expected, std::numeric_limits::min())) { + MemoryLimitPool::get_instance().release_parquet(context.size); + context.arrow = nullptr; + context.arrow_refs.clear(); + } +} + +bool ParquetBufferPool::is_dead_node(BlockEvictionQueue::BlockType &block) { + std::shared_lock lock(table_mutex_); + auto iter = table_.find(block.parquet_buffer_block.first); + if (iter == table_.end()) { + return true; + } + return iter->second.load_count.load() != block.parquet_buffer_block.second; +} + +} // namespace ailego +} // namespace zvec \ No newline at end of file diff --git a/src/ailego/buffer/vector_page_table.cc b/src/ailego/buffer/vector_page_table.cc new file mode 100644 index 00000000..fec7a190 --- /dev/null +++ b/src/ailego/buffer/vector_page_table.cc @@ -0,0 +1,277 @@ +// Copyright 2025-present the zvec project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#if !defined(_MSC_VER) +#include +#endif + +#if defined(_MSC_VER) +#ifndef NOMINMAX +#define NOMINMAX +#endif +#include +static ssize_t zvec_pread(int fd, void *buf, size_t count, size_t offset) { + HANDLE handle = reinterpret_cast(_get_osfhandle(fd)); + if (handle == INVALID_HANDLE_VALUE) return -1; + OVERLAPPED ov = {}; + ov.Offset = static_cast(offset & 0xFFFFFFFF); + ov.OffsetHigh = static_cast(offset >> 32); + DWORD bytes_read = 0; + if (!ReadFile(handle, buf, static_cast(count), &bytes_read, &ov)) { + return -1; + } + return static_cast(bytes_read); +} +#endif + +namespace zvec { +namespace ailego { + +void VectorPageTable::init(size_t entry_num) { + if (entries_) { + delete[] entries_; + } + entry_num_ = entry_num; + entries_ = new Entry[entry_num_]; + for (size_t i = 0; i < entry_num_; i++) { + entries_[i].ref_count.store(std::numeric_limits::min()); + entries_[i].in_evict_queue.store(false); + entries_[i].buffer = nullptr; + } +} + +char *VectorPageTable::acquire_block(block_id_t block_id) { + assert(block_id < entry_num_); + Entry &entry = entries_[block_id]; + while (true) { + int current_count = entry.ref_count.load(std::memory_order_acquire); + if (current_count < 0) { + return nullptr; + } + if (entry.ref_count.compare_exchange_weak(current_count, current_count + 1, + std::memory_order_acq_rel, + std::memory_order_acquire)) { + return entry.buffer; + } + } +} + +void VectorPageTable::release_block(block_id_t block_id) { + assert(block_id < entry_num_); + Entry &entry = entries_[block_id]; + + if (entry.ref_count.fetch_sub(1, std::memory_order_release) == 1) { + std::atomic_thread_fence(std::memory_order_acquire); + // Attempt to transition in_evict_queue from false -> true. The CAS ensures + // only one thread enqueues this block even if multiple threads race here. + bool expected = false; + if (entry.in_evict_queue.compare_exchange_strong( + expected, true, std::memory_order_acq_rel, + std::memory_order_relaxed)) { + BlockEvictionQueue::BlockType block; + block.page_table = this; + block.vector_block.first = block_id; + block.vector_block.second = 0; + BlockEvictionQueue::get_instance().add_single_block(block, 0); + } + // else: block is already in the eviction queue; do not add a duplicate + // entry. + } +} + +void VectorPageTable::evict_block(block_id_t block_id) { + assert(block_id < entry_num_); + Entry &entry = entries_[block_id]; + char *buffer = entry.buffer; + size_t size = entry.size; + int expected = 0; + if (entry.ref_count.compare_exchange_strong( + expected, std::numeric_limits::min())) { + if (buffer) { + MemoryLimitPool::get_instance().release_buffer(buffer, size); + } + } + // Always reset in_evict_queue regardless of whether the CAS succeeded: + // - On success: the block is evicted; future releases should re-register it. + // - On failure: the block was re-acquired by another thread between the + // ref-count check and this call. Clearing in_evict_queue lets the next + // release_block() re-enqueue it so it is not silently lost. + entry.in_evict_queue.store(false, std::memory_order_relaxed); +} + +char *VectorPageTable::set_block_acquired(block_id_t block_id, char *buffer, + size_t size) { + assert(block_id < entry_num_); + Entry &entry = entries_[block_id]; + while (true) { + int current_count = entry.ref_count.load(std::memory_order_relaxed); + if (current_count >= 0) { + // Defensive branch: in practice this path should never be reached. + // set_block_acquired() is always called under block_mutexes_[block_id], + // and the caller (acquire_buffer) re-checks acquire_block() inside the + // same lock before invoking this function. Therefore, if we get here, + // ref_count must still be negative (unloaded). This branch is retained + // as a safety net in case the locking contract is violated in the future, + // e.g. if set_block_acquired is called from an unlocked context. + if (entry.ref_count.compare_exchange_weak( + current_count, current_count + 1, std::memory_order_acq_rel, + std::memory_order_acquire)) { + MemoryLimitPool::get_instance().release_buffer(buffer, size); + return entry.buffer; + } + } else { + entry.buffer = buffer; + entry.size = size; + // Ensure in_evict_queue is cleared when the block is freshly loaded so + // that the first release_block() after loading can register it in the + // eviction queue. + entry.in_evict_queue.store(false, std::memory_order_relaxed); + entry.ref_count.store(1, std::memory_order_release); + return entry.buffer; + } + } +} + +VecBufferPool::VecBufferPool(const std::string &filename) { + file_name_ = filename; +#if defined(_MSC_VER) + fd_ = _open(filename.c_str(), O_RDONLY | _O_BINARY); +#else + fd_ = open(filename.c_str(), O_RDONLY); +#endif + if (fd_ < 0) { + throw std::runtime_error("Failed to open file: " + filename); + } +#if defined(_MSC_VER) + struct _stat64 st; + if (_fstat64(fd_, &st) < 0) { + _close(fd_); +#else + struct stat st; + if (fstat(fd_, &st) < 0) { + ::close(fd_); +#endif + throw std::runtime_error("Failed to stat file: " + filename); + } + file_size_ = st.st_size; +} + +int VecBufferPool::init(size_t segment_count) { + size_t block_num = segment_count + 10; + page_table_.init(block_num); + // Allocate all mutexes in a single contiguous array so that the cold-path + // lock in acquire_buffer() accesses cache-friendly memory instead of + // chasing 31K+ independent heap pointers. + block_mutexes_ = std::make_unique(block_num); + block_mutexes_count_ = block_num; + LOG_DEBUG("entry num: %zu", page_table_.entry_num()); + return 0; +} + +VecBufferPoolHandle VecBufferPool::get_handle() { + return VecBufferPoolHandle(*this); +} + +char *VecBufferPool::acquire_buffer(block_id_t block_id, size_t offset, + size_t size, int retry) { + assert(block_id < block_mutexes_count_); + char *buffer = page_table_.acquire_block(block_id); + if (buffer) { + return buffer; + } + std::lock_guard lock(block_mutexes_[block_id]); + buffer = page_table_.acquire_block(block_id); + if (buffer) { + return buffer; + } + { + bool found = + MemoryLimitPool::get_instance().try_acquire_buffer(size, buffer); + if (!found) { + for (int i = 0; i < retry; i++) { + BlockEvictionQueue::get_instance().recycle(); + found = + MemoryLimitPool::get_instance().try_acquire_buffer(size, buffer); + if (found) { + break; + } + } + } + if (!found) { + LOG_ERROR( + "Buffer pool failed to get free buffer: file[%s], block_id[%zu], " + "offset[%zu], size[%zu]", + file_name_.c_str(), block_id, offset, size); + return nullptr; + } + } + +#if defined(_MSC_VER) + ssize_t read_bytes = zvec_pread(fd_, buffer, size, offset); +#else + ssize_t read_bytes = pread(fd_, buffer, size, offset); +#endif + if (read_bytes != static_cast(size)) { + LOG_ERROR( + "Buffer pool failed to read file at offset: file[%s], block_id[%zu], " + "offset[%zu], size[%zu]", + file_name_.c_str(), block_id, offset, size); + MemoryLimitPool::get_instance().release_buffer(buffer, size); + return nullptr; + } + return page_table_.set_block_acquired(block_id, buffer, size); +} + +int VecBufferPool::get_meta(size_t offset, size_t length, char *buffer) { +#if defined(_MSC_VER) + ssize_t read_bytes = zvec_pread(fd_, buffer, length, offset); +#else + ssize_t read_bytes = pread(fd_, buffer, length, offset); +#endif + if (read_bytes != static_cast(length)) { + LOG_ERROR( + "Buffer pool failed to read file at offset: file[%s], offset[%zu], " + "length[%zu]", + file_name_.c_str(), offset, length); + return -1; + } + return 0; +} + +char *VecBufferPoolHandle::get_block(size_t offset, size_t size, + size_t block_id) { + char *buffer = pool_.acquire_buffer(block_id, offset, size, 50); + return buffer; +} + +int VecBufferPoolHandle::get_meta(size_t offset, size_t length, char *buffer) { + return pool_.get_meta(offset, length, buffer); +} + +void VecBufferPoolHandle::release_one(block_id_t block_id) { + pool_.page_table_.release_block(block_id); +} + +void VecBufferPoolHandle::acquire_one(block_id_t block_id) { + // The caller must guarantee the block is already loaded before calling + // acquire_one(). The return value of acquire_block() is intentionally + // ignored here, as a null return would indicate a contract violation. + pool_.page_table_.acquire_block(block_id); +} + +} // namespace ailego +} // namespace zvec \ No newline at end of file diff --git a/src/ailego/internal/cpu_features.cc b/src/ailego/internal/cpu_features.cc index e2dd2b23..066b0a6a 100644 --- a/src/ailego/internal/cpu_features.cc +++ b/src/ailego/internal/cpu_features.cc @@ -17,7 +17,9 @@ #if defined(_MSC_VER) #include -#elif !defined(__ARM_ARCH) +#endif + +#if (defined(__x86_64__) || defined(__i386__)) && !defined(_MSC_VER) #include #endif @@ -34,7 +36,7 @@ namespace internal { CpuFeatures::CpuFlags CpuFeatures::flags_; -#if defined(_MSC_VER) +#if defined(_MSC_VER) && (defined(_M_X64) || defined(_M_IX86)) CpuFeatures::CpuFlags::CpuFlags(void) : L1_ECX(0), L1_EDX(0), L7_EBX(0), L7_ECX(0), L7_EDX(0) { int l1[4] = {0, 0, 0, 0}; @@ -48,7 +50,7 @@ CpuFeatures::CpuFlags::CpuFlags(void) L7_ECX = l7[2]; L7_EDX = l7[3]; } -#elif !defined(__ARM_ARCH) +#elif defined(__x86_64__) || defined(__i386__) CpuFeatures::CpuFlags::CpuFlags(void) : L1_ECX(0), L1_EDX(0), L7_EBX(0), L7_ECX(0), L7_EDX(0) { uint32_t eax, ebx, ecx, edx; diff --git a/src/core/algorithm/cluster/CMakeLists.txt b/src/core/algorithm/cluster/CMakeLists.txt index b0ccc79b..d954b0a3 100644 --- a/src/core/algorithm/cluster/CMakeLists.txt +++ b/src/core/algorithm/cluster/CMakeLists.txt @@ -1,10 +1,19 @@ include(${PROJECT_ROOT_DIR}/cmake/bazel.cmake) include(${PROJECT_ROOT_DIR}/cmake/option.cmake) +# --exclude-libs is GNU ld / LLVM lld only; Apple ld does not support it. +# On macOS (Mach-O), symbol interposition works differently and the +# Arrow/Parquet double-free issue does not apply. +if(NOT APPLE) + set(CORE_KNN_CLUSTER_LDFLAGS + "-Wl,--exclude-libs,libparquet.a:libarrow.a:libarrow_bundled_dependencies.a") +endif() + cc_library( NAME core_knn_cluster STATIC SHARED STRICT ALWAYS_LINK SRCS *.cc - LIBS zvec_ailego core_framework + LIBS zvec_ailego core_framework INCS . ${PROJECT_ROOT_DIR}/src/core ${PROJECT_ROOT_DIR}/src/core/cluster + LDFLAGS "${CORE_KNN_CLUSTER_LDFLAGS}" VERSION "${PROXIMA_ZVEC_VERSION}" ) diff --git a/src/core/algorithm/hnsw/hnsw_entity.h b/src/core/algorithm/hnsw/hnsw_entity.h index 71f2080c..74b959e8 100644 --- a/src/core/algorithm/hnsw/hnsw_entity.h +++ b/src/core/algorithm/hnsw/hnsw_entity.h @@ -516,8 +516,8 @@ class HnswEntity { constexpr static uint32_t kDefaultDocsHardLimit = 1 << 30U; // 1 billion constexpr static float kDefaultDocsSoftLimitRatio = 0.9f; constexpr static size_t kMaxChunkSize = 0xFFFFFFFF; - constexpr static size_t kDefaultChunkSize = 2 * 1024UL * 1024UL; - constexpr static size_t kDefaultMaxChunkCnt = 50000UL; + constexpr static size_t kDefaultChunkSize = 16 * 1024UL; + constexpr static size_t kDefaultMaxChunkCnt = 128 * 50000UL; constexpr static float kDefaultNeighborPruneMultiplier = 1.0f; // prune_cnt = upper_max_neighbor_cnt * multiplier constexpr static float kDefaultL0MaxNeighborCntMultiplier = diff --git a/src/core/utility/buffer_storage.cc b/src/core/utility/buffer_storage.cc index a20a0316..aa2fd19b 100644 --- a/src/core/utility/buffer_storage.cc +++ b/src/core/utility/buffer_storage.cc @@ -14,7 +14,7 @@ #include #include -#include +#include #include #include #include @@ -175,8 +175,6 @@ class BufferStorage : public IndexStorage { //! Initialize storage int init(const ailego::Params ¶ms) override { - params.get(BUFFER_STORAGE_MEMORY_SIZE, &buffer_size_); - LOG_INFO("buffer size: %lu", buffer_size_); return 0; } @@ -196,16 +194,14 @@ class BufferStorage : public IndexStorage { if (ret != 0) { return ret; } - ret = buffer_pool_->init(buffer_size_, max_segment_size_, segments_.size()); - // for (auto iter = segments_.begin(); iter != segments_.end(); iter++) { - // auto seg = this->get(iter->first, 0); - // MemoryBlock block; - // int len = seg->read(0, block, 1); - // LOG_ERROR("segment %s: %d", iter->first.c_str(), len); - // } + ret = buffer_pool_->init(segments_.size()); if (ret != 0) { return ret; } + LOG_INFO( + "BufferStorage opened: file=%s, max_segment_size=%lu, " + "segment_count=%zu", + file_name_.c_str(), max_segment_size_, segments_.size()); return 0; } diff --git a/src/core/utility/utility_params.h b/src/core/utility/utility_params.h index fa28c644..c57e6e98 100644 --- a/src/core/utility/utility_params.h +++ b/src/core/utility/utility_params.h @@ -72,10 +72,6 @@ static const std::string MMAPFILE_STORAGE_FORCE_FLUSH = static const std::string MMAPFILE_STORAGE_SEGMENT_META_CAPACITY = "proxima.mmap_file.storage.segment_meta_capacity"; -//! BufferStorage -static const std::string BUFFER_STORAGE_MEMORY_SIZE = - "proxima.buffer.storage.memory_size"; - //! MipsConverter static const std::string MIPS_CONVERTER_M_VALUE = "proxima.mips.converter.m_value"; diff --git a/src/db/common/global_resource.cc b/src/db/common/global_resource.cc index 2f4ad1ca..6c079d4c 100644 --- a/src/db/common/global_resource.cc +++ b/src/db/common/global_resource.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "db/common/global_resource.h" #include +#include #include #include @@ -25,8 +26,8 @@ void GlobalResource::initialize() { new ailego::ThreadPool(GlobalConfig::Instance().query_thread_count())); this->optimize_thread_pool_.reset(new ailego::ThreadPool( GlobalConfig::Instance().optimize_thread_count())); - ailego::BufferManager::Instance().init( - GlobalConfig::Instance().memory_limit_bytes(), 1); + zvec::ailego::MemoryLimitPool::get_instance().init( + GlobalConfig::Instance().memory_limit_bytes()); }); } diff --git a/src/db/index/segment/segment.cc b/src/db/index/segment/segment.cc index 821d236e..34894d18 100644 --- a/src/db/index/segment/segment.cc +++ b/src/db/index/segment/segment.cc @@ -3415,8 +3415,8 @@ Status SegmentImpl::alter_column(const std::string &column_name, } if (!options_.enable_mmap_) { - ailego::BufferManager::Instance().init( - GlobalConfig::Instance().memory_limit_bytes(), 1); + zvec::ailego::MemoryLimitPool::get_instance().init( + GlobalConfig::Instance().memory_limit_bytes()); } // delete single column store file @@ -3510,8 +3510,8 @@ Status SegmentImpl::drop_column(const std::string &column_name) { } if (!options_.enable_mmap_) { - ailego::BufferManager::Instance().init( - GlobalConfig::Instance().memory_limit_bytes(), 1); + zvec::ailego::MemoryLimitPool::get_instance().init( + GlobalConfig::Instance().memory_limit_bytes()); } // delete single column store file diff --git a/src/db/index/storage/bufferpool_forward_store.cc b/src/db/index/storage/bufferpool_forward_store.cc index a8cbaee3..4d2b2f6e 100644 --- a/src/db/index/storage/bufferpool_forward_store.cc +++ b/src/db/index/storage/bufferpool_forward_store.cc @@ -22,6 +22,7 @@ #include #include #include +#include #include #include "db/index/storage/store_helper.h" #include "lazy_record_batch_reader.h" @@ -192,10 +193,11 @@ TablePtr BufferPoolForwardStore::fetch(const std::vector &columns, for (const auto &[rg_id, pairs] : rg_to_local) { for (size_t i = 0; i < col_indices.size(); ++i) { int col_idx = col_indices[i]; - auto buffer_id = ailego::BufferID::ParquetID(file_path_, col_idx, rg_id); - auto buffer_handle = buf_mgr.acquire(buffer_id); - auto col_chunked_array = buffer_handle.pin_parquet_data(); - + auto buffer_id = ailego::ParquetBufferID(file_path_, col_idx, rg_id); + auto buffer_handle = + ailego::ParquetBufferPool::get_instance().acquire_buffer(buffer_id); + std::shared_ptr col_chunked_array = + buffer_handle.data(); if (!col_chunked_array) { LOG_ERROR( "Failed to pin parquet data for file: %s, column: %d, row_group: " @@ -318,9 +320,11 @@ ExecBatchPtr BufferPoolForwardStore::fetch( auto &buf_mgr = ailego::BufferManager::Instance(); for (size_t i = 0; i < col_indices.size(); ++i) { int col_idx = col_indices[i]; - auto buffer_id = ailego::BufferID::ParquetID(file_path_, col_idx, rg_id); - auto buffer_handle = buf_mgr.acquire(buffer_id); - auto col_chunked_array = buffer_handle.pin_parquet_data(); + auto buffer_id = ailego::ParquetBufferID(file_path_, col_idx, rg_id); + auto buffer_handle = + ailego::ParquetBufferPool::get_instance().acquire_buffer(buffer_id); + std::shared_ptr col_chunked_array = + buffer_handle.data(); if (!col_chunked_array) { LOG_ERROR( diff --git a/src/db/index/storage/lazy_record_batch_reader.h b/src/db/index/storage/lazy_record_batch_reader.h index c9e124c5..422708ed 100644 --- a/src/db/index/storage/lazy_record_batch_reader.h +++ b/src/db/index/storage/lazy_record_batch_reader.h @@ -17,6 +17,7 @@ #include #include #include +#include #include "db/common/constants.h" @@ -128,10 +129,11 @@ class ParquetRecordBatchReader : public arrow::RecordBatchReader { if (with_cache_) { auto &buf_mgr = ailego::BufferManager::Instance(); for (size_t col_idx = 0; col_idx < col_indices_.size(); ++col_idx) { - auto buffer_id = ailego::BufferID::ParquetID( - file_path_, col_indices_[col_idx], rg_id); - auto buffer_handle = buf_mgr.acquire(buffer_id); - auto col_chunked_array = buffer_handle.pin_parquet_data(); + auto buffer_id = ailego::ParquetBufferID(file_path_, col_idx, rg_id); + auto buffer_handle = + ailego::ParquetBufferPool::get_instance().acquire_buffer(buffer_id); + std::shared_ptr col_chunked_array = + buffer_handle.data(); if (col_chunked_array) { std::shared_ptr concat; auto concat_result = arrow::Concatenate(col_chunked_array->chunks(), diff --git a/src/include/zvec/ailego/buffer/block_eviction_queue.h b/src/include/zvec/ailego/buffer/block_eviction_queue.h new file mode 100644 index 00000000..2c78b99c --- /dev/null +++ b/src/include/zvec/ailego/buffer/block_eviction_queue.h @@ -0,0 +1,158 @@ +// Copyright 2025-present the zvec project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "concurrentqueue.h" + +#if defined(_MSC_VER) +#include +#endif + +namespace zvec { +namespace ailego { + +class VectorPageTable; + +using block_id_t = size_t; +using version_t = size_t; + +struct ParquetBufferID { + std::string filename; + int column; + int row_group; + uint64_t file_id; + long mtime; + ParquetBufferID() = default; + ParquetBufferID(std::string &filename, int column, int row_group); + const std::string to_string() const; +}; + +class BlockEvictionQueue { + public: + struct BlockType { + std::pair vector_block; + std::pair parquet_buffer_block; + VectorPageTable *page_table{nullptr}; + }; + typedef moodycamel::ConcurrentQueue ConcurrentQueue; + + static BlockEvictionQueue &get_instance() { + static BlockEvictionQueue instance; + return instance; + } + BlockEvictionQueue(const BlockEvictionQueue &) = delete; + BlockEvictionQueue &operator=(const BlockEvictionQueue &) = delete; + BlockEvictionQueue(BlockEvictionQueue &&) = delete; + BlockEvictionQueue &operator=(BlockEvictionQueue &&) = delete; + + int init(); + + bool evict_single_block(BlockType &item); + + bool evict_block(BlockType &item); + + bool add_single_block(const BlockType &block, int queue_index); + + // void clear_dead_node(); + + bool is_valid(VectorPageTable *page_table) { + std::shared_lock lock(valid_page_tables_mutex_); + return valid_page_tables_.find(page_table) != valid_page_tables_.end(); + } + + void set_valid(VectorPageTable *page_table) { + std::unique_lock lock(valid_page_tables_mutex_); + valid_page_tables_.insert(page_table); + } + + void set_invalid(VectorPageTable *page_table) { + std::unique_lock lock(valid_page_tables_mutex_); + valid_page_tables_.erase(page_table); + } + + // Atomically checks under the shared lock that the page table is still valid + // AND the block version has not been superseded, preventing TOCTOU races + // when a VectorPageTable is concurrently destroyed. + bool is_valid_and_alive(const BlockType &item); + + void recycle(); + + private: + BlockEvictionQueue() { + init(); + } + + private: + constexpr static size_t CACHE_QUEUE_NUM = 3; + size_t evict_batch_size_{0}; + std::vector evict_queues_; + std::unordered_set valid_page_tables_; + std::shared_mutex valid_page_tables_mutex_; +}; + +class MemoryLimitPool { + public: + static MemoryLimitPool &get_instance() { + static MemoryLimitPool instance; + return instance; + } + MemoryLimitPool(const MemoryLimitPool &) = delete; + MemoryLimitPool &operator=(const MemoryLimitPool &) = delete; + MemoryLimitPool(MemoryLimitPool &&) = delete; + MemoryLimitPool &operator=(MemoryLimitPool &&) = delete; + + int init(size_t pool_size); + + bool try_acquire_buffer(const size_t buffer_size, char *&buffer); + + void acquire_parquet(const size_t buffer_size); + + void release_buffer(char *buffer, const size_t buffer_size); + + void release_parquet(const size_t buffer_size); + + bool is_full(); + + private: + MemoryLimitPool() = default; + + private: + size_t pool_size_{0}; + std::atomic used_size_{0}; +}; + +} // namespace ailego +} // namespace zvec \ No newline at end of file diff --git a/src/include/zvec/ailego/buffer/buffer_pool.h b/src/include/zvec/ailego/buffer/buffer_pool.h deleted file mode 100644 index 69a01b2f..00000000 --- a/src/include/zvec/ailego/buffer/buffer_pool.h +++ /dev/null @@ -1,173 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include "concurrentqueue.h" - -#if defined(_MSC_VER) -#include -#endif - -namespace zvec { -namespace ailego { - -using block_id_t = size_t; -using version_t = size_t; - -class LPMap; - -class LRUCache { - public: - typedef std::pair BlockType; - typedef moodycamel::ConcurrentQueue ConcurrentQueue; - - int init(size_t block_size); - - bool evict_single_block(BlockType &item); - - bool add_single_block(const LPMap *lp_map, const BlockType &block, - int block_type); - - void clear_dead_node(const LPMap *lp_map); - - private: - constexpr static size_t CATCH_QUEUE_NUM = 3; - size_t block_size_{0}; - std::vector queues_; - alignas(64) std::atomic evict_queue_insertions_{0}; -}; - -class LPMap { - struct Entry { - alignas(64) std::atomic ref_count; - alignas(64) std::atomic load_count; - char *buffer; - }; - - public: - LPMap() : entry_num_(0), entries_(nullptr) {} - ~LPMap() { - delete[] entries_; - } - - void init(size_t entry_num); - - char *acquire_block(block_id_t block_id, bool lru_mode); - - void release_block(block_id_t block_id); - - char *evict_block(block_id_t block_id); - - char *set_block_acquired(block_id_t block_id, char *buffer); - - void recycle(moodycamel::ConcurrentQueue &free_buffers); - - size_t entry_num() const { - return entry_num_; - } - - inline bool isDeadBlock(LRUCache::BlockType block) const { - Entry &entry = entries_[block.first]; - return block.second != entry.load_count.load(); - } - - private: - size_t entry_num_{0}; - Entry *entries_{nullptr}; - LRUCache cache_; -}; - -class VecBufferPoolHandle; - -class VecBufferPool { - public: - typedef std::shared_ptr Pointer; - - VecBufferPool(const std::string &filename); - ~VecBufferPool() { - // Free all buffers in the free list - char *buf = nullptr; - while (free_buffers_.try_dequeue(buf)) { - ailego_free(buf); - } - // Free any buffers still pinned in the map - for (size_t i = 0; i < lp_map_.entry_num(); ++i) { - char *b = lp_map_.evict_block(i); - if (b) ailego_free(b); - } -#if defined(_MSC_VER) - _close(fd_); -#else - close(fd_); -#endif - } - - int init(size_t pool_capacity, size_t block_size, size_t segment_count); - - VecBufferPoolHandle get_handle(); - - char *acquire_buffer(block_id_t block_id, size_t offset, size_t size, - int retry = 0); - - int get_meta(size_t offset, size_t length, char *buffer); - - size_t file_size() const { - return file_size_; - } - - bool no_lru_mode() { - return no_lru_mode_; - } - - private: - int fd_; - size_t file_size_; - size_t pool_capacity_; - bool no_lru_mode_; - - public: - LPMap lp_map_; - - private: - std::vector> mutex_vec_; - moodycamel::ConcurrentQueue free_buffers_; -}; - -class VecBufferPoolHandle { - public: - VecBufferPoolHandle(VecBufferPool &pool) : pool_(pool) {} - VecBufferPoolHandle(VecBufferPoolHandle &&other) : pool_(other.pool_) {} - - ~VecBufferPoolHandle() = default; - - typedef std::shared_ptr Pointer; - - char *get_block(size_t offset, size_t size, size_t block_id); - - int get_meta(size_t offset, size_t length, char *buffer); - - void release_one(block_id_t block_id); - - void acquire_one(block_id_t block_id); - - private: - VecBufferPool &pool_; -}; - -} // namespace ailego -} // namespace zvec \ No newline at end of file diff --git a/src/include/zvec/ailego/buffer/parquet_hash_table.h b/src/include/zvec/ailego/buffer/parquet_hash_table.h new file mode 100644 index 00000000..f4fb4f62 --- /dev/null +++ b/src/include/zvec/ailego/buffer/parquet_hash_table.h @@ -0,0 +1,164 @@ +// Copyright 2025-present the zvec project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "block_eviction_queue.h" + +namespace arrow { +class ChunkedArray; +class Array; +class DataType; +class Scalar; +template +class Result; +class Status; +class Buffer; +} // namespace arrow + +namespace zvec { +namespace ailego { + +class BlockEvictionQueue; + +struct IDHash { + size_t operator()(const ParquetBufferID &buffer_id) const { + size_t hash = std::hash{}(1); + hash = hash ^ (std::hash{}(buffer_id.file_id)); + hash = hash * 31 + std::hash{}(buffer_id.column); + hash = hash * 31 + std::hash{}(buffer_id.row_group); + return hash; + } +}; + +struct IDEqual { + bool operator()(const ParquetBufferID &a, const ParquetBufferID &b) const { + if (a.filename != b.filename) { + return false; + } + if (a.file_id != b.file_id) { + return false; + } + if (a.mtime != b.mtime) { + return false; + } + return a.column == b.column && a.row_group == b.row_group; + } +}; + +struct ParquetBufferContext { + // A shared pointer to the buffers allocated for arrow parquet data + std::shared_ptr arrow{nullptr}; + + // Guard original arrow buffers to prevent premature deletion + std::vector> arrow_refs{}; + + size_t size; + alignas(64) std::atomic ref_count{std::numeric_limits::min()}; + alignas(64) std::atomic load_count{0}; +}; + +class ParquetBufferContextHandle { + public: + ParquetBufferContextHandle() {} + ParquetBufferContextHandle(ParquetBufferID &buffer_id, + std::shared_ptr arrow) + : buffer_id_(buffer_id), arrow_(arrow) {} + ParquetBufferContextHandle(const ParquetBufferContextHandle &handle_); + ParquetBufferContextHandle(ParquetBufferContextHandle &&handle_) + : buffer_id_(std::move(handle_.buffer_id_)), + arrow_(std::move(handle_.arrow_)) {} + + ~ParquetBufferContextHandle(); + + std::shared_ptr data() { + return arrow_; + } + + private: + ParquetBufferID buffer_id_; + std::shared_ptr arrow_{nullptr}; +}; + +class ParquetBufferPool { + public: + typedef std::shared_ptr Pointer; + + struct ArrowBufferDeleter { + explicit ArrowBufferDeleter(ParquetBufferPool *c, ParquetBufferID i) + : pool(c), id(i) {} + ParquetBufferPool *pool; + ParquetBufferID id; + // Only reduces the reference count but does not actually release the + // buffer, since the buffer memory is managed by the BufferManager. + void operator()(arrow::Buffer *) { + return; + } + }; + + using Table = std::unordered_map; + + arrow::Status acquire(ParquetBufferID buffer_id, + ParquetBufferContext &context); + + ParquetBufferContextHandle acquire_buffer(ParquetBufferID buffer_id); + + std::shared_ptr set_block_acquired( + ParquetBufferID buffer_id); + + std::shared_ptr acquire(ParquetBufferID buffer_id); + + std::shared_ptr acquire_locked( + ParquetBufferID buffer_id); + + void release(ParquetBufferID buffer_id); + + void evict(ParquetBufferID buffer_id); + + bool is_dead_node(BlockEvictionQueue::BlockType &block); + + static ParquetBufferPool &get_instance() { + static ParquetBufferPool instance; + return instance; + } + + ParquetBufferPool(const ParquetBufferPool &) = delete; + ParquetBufferPool &operator=(const ParquetBufferPool &) = delete; + ParquetBufferPool(ParquetBufferPool &&) = delete; + ParquetBufferPool &operator=(ParquetBufferPool &&) = delete; + + private: + ParquetBufferPool() = default; + + private: + Table table_; + std::shared_mutex table_mutex_; +}; + +} // namespace ailego +} // namespace zvec \ No newline at end of file diff --git a/src/include/zvec/ailego/buffer/vector_page_table.h b/src/include/zvec/ailego/buffer/vector_page_table.h new file mode 100644 index 00000000..653b7af5 --- /dev/null +++ b/src/include/zvec/ailego/buffer/vector_page_table.h @@ -0,0 +1,178 @@ +// Copyright 2025-present the zvec project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "block_eviction_queue.h" +#include "concurrentqueue.h" + +#if defined(_MSC_VER) +#include +#endif + +namespace zvec { +namespace ailego { + +class VectorPageTable { + struct alignas(64) Entry { + std::atomic ref_count; + // True when this block has been enqueued in BlockEvictionQueue and has not + // yet been evicted. Used in release_block() to suppress duplicate + // insertions: once a block is in the eviction queue we never push it again + // until it is evicted (which resets the flag). + std::atomic in_evict_queue; + char *buffer; + size_t size; + }; + + public: + VectorPageTable() : entry_num_(0), entries_(nullptr) { + BlockEvictionQueue::get_instance().set_valid(this); + } + ~VectorPageTable() { + BlockEvictionQueue::get_instance().set_invalid(this); + delete[] entries_; + } + + VectorPageTable(const VectorPageTable &) = delete; + VectorPageTable &operator=(const VectorPageTable &) = delete; + VectorPageTable(VectorPageTable &&) = delete; + VectorPageTable &operator=(VectorPageTable &&) = delete; + + void init(size_t entry_num); + + char *acquire_block(block_id_t block_id); + + void release_block(block_id_t block_id); + + void evict_block(block_id_t block_id); + + char *set_block_acquired(block_id_t block_id, char *buffer, size_t size); + + size_t entry_num() const { + return entry_num_; + } + + // Returns true if the block has no active references (ref_count <= 0). + // Used by VecBufferPool destructor to assert all handles are released. + bool is_released(block_id_t block_id) const { + assert(block_id < entry_num_); + return entries_[block_id].ref_count.load(std::memory_order_relaxed) <= 0; + } + + // Returns true if the block is no longer registered in the eviction queue + // (either it was never added, or it has already been evicted). + // Used by BlockEvictionQueue to detect stale queue entries. + inline bool is_dead_block(BlockEvictionQueue::BlockType block) const { + Entry &entry = entries_[block.vector_block.first]; + return !entry.in_evict_queue.load(std::memory_order_relaxed); + } + + private: + size_t entry_num_{0}; + Entry *entries_{nullptr}; +}; + +class VecBufferPoolHandle; + +class VecBufferPool { + public: + typedef std::shared_ptr Pointer; + + VecBufferPool(const std::string &filename); + ~VecBufferPool() { + for (size_t i = 0; i < page_table_.entry_num(); ++i) { + // A positive ref_count means a VecBufferPoolHandle is still alive, + // which is a contract violation: all handles must be destroyed before + // the pool itself is destroyed. + assert(page_table_.is_released(i)); + page_table_.evict_block(i); + } +#if defined(_MSC_VER) + _close(fd_); +#else + close(fd_); +#endif + } + + int init(size_t segment_count); + + VecBufferPoolHandle get_handle(); + + char *acquire_buffer(block_id_t block_id, size_t offset, size_t size, + int retry = 0); + + int get_meta(size_t offset, size_t length, char *buffer); + + size_t file_size() const { + return file_size_; + } + + private: + int fd_; + size_t file_size_; + std::string file_name_; + + public: + VectorPageTable page_table_; + + private: + // Contiguous array of per-block mutexes (one allocation, cache-friendly for + // the cold-path load in acquire_buffer). block_mutexes_count_ mirrors the + // array length because unique_ptr has no built-in size accessor. + std::unique_ptr block_mutexes_{}; + size_t block_mutexes_count_{0}; +}; + +class VecBufferPoolHandle { + public: + VecBufferPoolHandle(VecBufferPool &pool) : pool_(pool) {} + VecBufferPoolHandle(VecBufferPoolHandle &&other) : pool_(other.pool_) {} + + ~VecBufferPoolHandle() = default; + + typedef std::shared_ptr Pointer; + + char *get_block(size_t offset, size_t size, size_t block_id); + + int get_meta(size_t offset, size_t length, char *buffer); + + void release_one(block_id_t block_id); + + void acquire_one(block_id_t block_id); + + private: + VecBufferPool &pool_; +}; + +} // namespace ailego +} // namespace zvec \ No newline at end of file diff --git a/src/include/zvec/core/framework/index_storage.h b/src/include/zvec/core/framework/index_storage.h index 600cb3f2..a4bec2cd 100644 --- a/src/include/zvec/core/framework/index_storage.h +++ b/src/include/zvec/core/framework/index_storage.h @@ -14,7 +14,7 @@ #pragma once -#include +#include #include #include #include diff --git a/tests/core/algorithm/flat/flat_streamer_buffer_test.cc b/tests/core/algorithm/flat/flat_streamer_buffer_test.cc index a67529d2..6502d532 100644 --- a/tests/core/algorithm/flat/flat_streamer_buffer_test.cc +++ b/tests/core/algorithm/flat/flat_streamer_buffer_test.cc @@ -47,6 +47,7 @@ void FlatStreamerTest::TearDown(void) { } TEST_F(FlatStreamerTest, TestLinearSearch) { + MemoryLimitPool::get_instance().init(2 * 1024UL * 1024UL * 1024UL); IndexStreamer::Pointer write_streamer = IndexFactory::CreateStreamer("FlatStreamer"); ASSERT_TRUE(write_streamer != nullptr); @@ -168,6 +169,7 @@ TEST_F(FlatStreamerTest, TestLinearSearch) { } TEST_F(FlatStreamerTest, TestLinearSearchWithLRU) { + MemoryLimitPool::get_instance().init(100 * 1024UL * 1024UL); #ifdef __ANDROID__ GTEST_SKIP() << "Skipped on Android: requires ~6GB memory/disk (emulator limit)"; #endif @@ -190,7 +192,7 @@ TEST_F(FlatStreamerTest, TestLinearSearchWithLRU) { auto ctx = write_streamer->create_context(); ASSERT_TRUE(!!ctx); - size_t cnt = 1000000UL; + size_t cnt = 50000UL; IndexQueryMeta qmeta(IndexMeta::DT_FP32, dim); for (size_t i = 0; i < cnt; i++) { NumericalVector vec(dim); diff --git a/tests/core/algorithm/flat/flat_streamer_buffer_time_test.cc b/tests/core/algorithm/flat/flat_streamer_buffer_time_test.cc index a76d5c57..a3c00632 100644 --- a/tests/core/algorithm/flat/flat_streamer_buffer_time_test.cc +++ b/tests/core/algorithm/flat/flat_streamer_buffer_time_test.cc @@ -137,6 +137,7 @@ TEST_F(FlatStreamerTest, TestLinearSearchMMap) { } TEST_F(FlatStreamerTest, TestLinearSearchBuffer) { + MemoryLimitPool::get_instance().init(2 * 1024UL * 1024UL * 1024UL); IndexStreamer::Pointer write_streamer = IndexFactory::CreateStreamer("FlatStreamer"); ASSERT_TRUE(write_streamer != nullptr); diff --git a/tests/core/algorithm/hnsw/hnsw_streamer_buffer_test.cc b/tests/core/algorithm/hnsw/hnsw_streamer_buffer_test.cc index 6f111a4b..30f9d7cb 100644 --- a/tests/core/algorithm/hnsw/hnsw_streamer_buffer_test.cc +++ b/tests/core/algorithm/hnsw/hnsw_streamer_buffer_test.cc @@ -48,6 +48,7 @@ void HnswStreamerTest::TearDown(void) { } TEST_F(HnswStreamerTest, TestHnswSearch) { + MemoryLimitPool::get_instance().init(2 * 1024UL * 1024UL * 1024UL); IndexStreamer::Pointer write_streamer = IndexFactory::CreateStreamer("HnswStreamer"); ASSERT_TRUE(write_streamer != nullptr); diff --git a/tests/core/interface/index_interface_test.cc b/tests/core/interface/index_interface_test.cc index 4d1aefd0..4016d31a 100644 --- a/tests/core/interface/index_interface_test.cc +++ b/tests/core/interface/index_interface_test.cc @@ -22,6 +22,7 @@ #include "core/algorithm/hnsw_rabitq/rabitq_converter.h" #include "zvec/core/framework/index_provider.h" #endif +#include #include "zvec/ailego/buffer/buffer_manager.h" #include "zvec/core/interface/index.h" #include "zvec/core/interface/index_factory.h" @@ -155,6 +156,7 @@ TEST(IndexInterface, General) { } TEST(IndexInterface, BufferGeneral) { + zvec::ailego::MemoryLimitPool::get_instance().init(100 * 1024 * 1024); constexpr uint32_t kDimension = 64; const std::string index_name{"test.index"}; @@ -261,7 +263,7 @@ TEST(IndexInterface, BufferGeneral) { .with_fetch_vector(true) .with_ef_search(20) .build()); - zvec::ailego::BufferManager::Instance().cleanup(); + // zvec::ailego::BufferManager::Instance().cleanup(); } diff --git a/tests/db/index/column/vector_column_indexer_test.cc b/tests/db/index/column/vector_column_indexer_test.cc index cbaf2d50..b798e8de 100644 --- a/tests/db/index/column/vector_column_indexer_test.cc +++ b/tests/db/index/column/vector_column_indexer_test.cc @@ -17,6 +17,7 @@ #include #include #include +#include #include "db/index/column/vector_column/vector_column_params.h" #include "tests/test_util.h" #include "zvec/ailego/utility/float_helper.h" @@ -2136,6 +2137,7 @@ TEST(VectorColumnIndexerTest, Failure) { // Test case 10: use_mmap = false { + zvec::ailego::MemoryLimitPool::get_instance().init(10 * 1024UL * 1024UL); auto indexer = std::make_shared( index_file_path, FieldSchema("test", DataType::VECTOR_FP32, 3, false, diff --git a/tests/db/index/segment/segment_test.cc b/tests/db/index/segment/segment_test.cc index 9530b8cf..770aed61 100644 --- a/tests/db/index/segment/segment_test.cc +++ b/tests/db/index/segment/segment_test.cc @@ -38,6 +38,7 @@ #include "db/index/storage/wal/wal_file.h" #include "utils/utils.h" #include "zvec/db/options.h" +#include using namespace zvec; @@ -49,7 +50,7 @@ class SegmentTest : public testing::TestWithParam { FileHelper::RemoveDirectory(col_path); FileHelper::CreateDirectory(col_path); - ailego::BufferManager::Instance().init(MIN_MEMORY_LIMIT_BYTES, 1); + zvec::ailego::MemoryLimitPool::get_instance().init(MIN_MEMORY_LIMIT_BYTES); std::string idmap_path = FileHelper::MakeFilePath(col_path, FileID::ID_FILE, 0); diff --git a/tests/db/index/storage/bufferpool_store_test.cc b/tests/db/index/storage/bufferpool_store_test.cc index 9d4ba188..3ea9024c 100644 --- a/tests/db/index/storage/bufferpool_store_test.cc +++ b/tests/db/index/storage/bufferpool_store_test.cc @@ -34,7 +34,7 @@ class BufferPoolStoreTest : public testing::Test { std::cout << "err: " << s.message() << std::endl; exit(1); } - ailego::BufferManager::Instance().init(10 * 1024 * 1024, 1); + zvec::ailego::MemoryLimitPool::get_instance().init(10 * 1024 * 1024); } void TearDown() override {