Skip to content
Draft
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions bindings/python/include/svs/python/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ pybind11::tuple py_search(
matrix_view(result_idx), matrix_view(result_dists)
);

svs::index::search_batch_into(self, q_result, query_data.cview());
{
pybind11::gil_scoped_release release;
svs::index::search_batch_into(self, q_result, query_data.cview());
}
return pybind11::make_tuple(result_idx, result_dists);
}

Expand Down Expand Up @@ -86,7 +89,7 @@ void add_threading_interface(pybind11::class_<Manager>& manager) {
"num_threads",
&Manager::get_num_threads,
[](Manager& self, int num_threads) {
self.set_threadpool(svs::threads::DefaultThreadPool(num_threads));
self.set_threadpool(svs::threads::SwitchNativeThreadPool(num_threads));
},
"Read/Write (int): Get and set the number of threads used to process queries."
);
Expand Down
27 changes: 23 additions & 4 deletions bindings/python/src/dynamic_vamana.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,12 @@ void add_points(
"Expected IDs to be the same length as the number of rows in points!"
);
}
index.add_points(data_view(py_data), std::span(ids.data(), ids.size()), reuse_empty);
auto data = data_view(py_data);
auto id_span = std::span(ids.data(), ids.size());
{
py::gil_scoped_release release;
index.add_points(data, id_span, reuse_empty);
}
}

const char* ADD_POINTS_DOCSTRING = R"(
Expand Down Expand Up @@ -381,8 +386,18 @@ void wrap(py::module& m) {

add_dynamic_vamana_properties(vamana);

vamana.def("consolidate", &svs::DynamicVamana::consolidate, CONSOLIDATE_DOCSTRING);
vamana.def("compact", &svs::DynamicVamana::compact, COMPACT_DOCSTRING);
vamana.def(
"consolidate",
&svs::DynamicVamana::consolidate,
py::call_guard<py::gil_scoped_release>(),
CONSOLIDATE_DOCSTRING
);
vamana.def(
"compact",
&svs::DynamicVamana::compact,
py::call_guard<py::gil_scoped_release>(),
COMPACT_DOCSTRING
);

// Reloading
vamana.def(
Expand Down Expand Up @@ -435,7 +450,11 @@ void wrap(py::module& m) {
vamana.def(
"delete",
[](svs::DynamicVamana& index, const py_contiguous_array_t<size_t>& ids) {
index.delete_points(as_span(ids));
auto id_span = as_span(ids);
{
py::gil_scoped_release release;
index.delete_points(id_span);
}
},
py::arg("ids"),
DELETE_DOCSTRING
Expand Down
2 changes: 1 addition & 1 deletion bindings/python/src/python_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ class ScopedModuleNameOverride {

} // namespace

PYBIND11_MODULE(_svs, m) {
PYBIND11_MODULE(_svs, m, py::mod_gil_not_used()) {
// Internally, the top level `__init__.py` imports everything from the C++ module named
// `_svs`.
//
Expand Down
22 changes: 16 additions & 6 deletions include/svs/concepts/graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@

namespace svs::graphs {

/// Outcome of `MemoryGraph::add_edge(src, dst)`. Distinguishes three cases so callers
/// can route dropped edges (e.g. to a backedge buffer) without a TOCTOU race between a
/// pre-check and the insert.
enum class AddEdgeResult : uint8_t {
Added, // Edge was inserted.
AlreadyExists, // Edge was already present (or self-loop). Not inserted.
Full, // Node's adjacency list is at max_degree. Edge NOT inserted.
};

// clang-format off

///
Expand Down Expand Up @@ -135,12 +144,13 @@ template <ImmutableMemoryGraph G> using index_type_t = typename G::index_type;
/// @code{.cpp}
/// template <typename T>
/// concept MemoryGraph = requires(T& g, const T& const_g) {
/// // Add an edge to the graph.
/// // Must return the out degree of `src` after adding the edge `src -> dst`.
/// // If adding the edge would result in the graph exceeding its maximum degree,
/// // implementations are free to not add this edge.
/// // Add an edge to the graph atomically. Returns an AddEdgeResult indicating:
/// // Added - edge was inserted
/// // AlreadyExists - edge was already present (or self-loop); no insert
/// // Full - node is at max_degree; edge NOT inserted (caller should
/// // route to an overflow buffer if needed)
/// requires requires(index_type_t<T> src, index_type_t<T> dst) {
/// { g.add_edge(src, dst) } -> std::convertible_to<size_t>;
/// { g.add_edge(src, dst) } -> std::convertible_to<AddEdgeResult>;
/// };
///
/// // Completely clear the adjacency list for vertex ``i``.
Expand All @@ -164,7 +174,7 @@ template <typename T>
concept MemoryGraph = requires(T& g, const T& const_g) {
// Adding an edge.
requires requires(index_type_t<T> src, index_type_t<T> dst) {
{ g.add_edge(src, dst) } -> std::convertible_to<size_t>;
{ g.add_edge(src, dst) } -> std::convertible_to<AddEdgeResult>;
};

// Clear adjacency list.
Expand Down
133 changes: 99 additions & 34 deletions include/svs/core/graph/graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@
#include "svs/core/data/simple.h"
#include "svs/lib/algorithms.h"
#include "svs/lib/boundscheck.h"
#include "svs/lib/concurrency/atomic_span.h"
#include "svs/lib/concurrency/seqlock.h"
#include "svs/lib/saveload.h"
#include "svs/lib/spinlock.h"

#include <atomic>
#include <cassert>
#include <cstdint>
#include <span>
Expand Down Expand Up @@ -57,12 +61,12 @@ template <std::unsigned_integral Idx, data::MemoryDataset Data> class SimpleGrap
/// The integer representation used to represent vertices in this graph.
using index_type = Idx;
using value_type = std::span<Idx>;
using const_value_type = std::span<const Idx>;
using const_value_type = AtomicSpan<const Idx>;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For better flexibility and allow user to select syncronized/non-syncronized index kind, I would define a dedicated SyncronizedGraphBase class.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure this duplication is really required. Performance penalty for synchronized vs non-synchronized search is only few precents.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @razdoburdin, is duplication too bad in this case? Otherwise, @rfsaliev may have a point on flexibility and it's always better if performance is not affected. But agree with you that pros and cons should be discussed if duplication is a large overhead.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan to investigate the trade-offs, after the finalization of design of concurrent path. Let's make sure it works well first.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To get valuable results, I would recommend to benchmark 'static' VamanaIndex with and without synchronized graph on different platforms - especially on multi-socket systems.


/// Type used to represent mutable adjacency lists externally.
using reference = std::span<Idx>;
/// Type used to represent constant adjacency lists externally.
using const_reference = std::span<const Idx>;
using const_reference = AtomicSpan<const Idx>;

///
/// @brief Construct an empty graph of the desired size.
Expand All @@ -75,7 +79,9 @@ template <std::unsigned_integral Idx, data::MemoryDataset Data> class SimpleGrap
///
explicit SimpleGraphBase(size_t num_nodes, size_t max_degree)
: data_{num_nodes, max_degree + 1}
, max_degree_{lib::narrow<Idx>(max_degree)} {
, max_degree_{lib::narrow<Idx>(max_degree)}
, seq_counters_(num_nodes)
, node_locks_(num_nodes) {
reset();
}

Expand All @@ -85,15 +91,19 @@ template <std::unsigned_integral Idx, data::MemoryDataset Data> class SimpleGrap
size_t num_nodes, size_t max_degree, const Allocator& allocator
)
: data_{num_nodes, max_degree + 1, allocator}
, max_degree_{lib::narrow<Idx>(max_degree)} {
, max_degree_{lib::narrow<Idx>(max_degree)}
, seq_counters_(num_nodes)
, node_locks_(num_nodes) {
reset();
}

explicit SimpleGraphBase(data_type data)
: data_{std::move(data)}
, max_degree_{lib::narrow<Idx>(data_.dimensions() - 1)} {}
, max_degree_{lib::narrow<Idx>(data_.dimensions() - 1)}
, seq_counters_(data_.size())
, node_locks_(data_.size()) {}

const_reference raw_row(Idx i) const { return data_.get_datum(i); }
std::span<const Idx> raw_row(Idx i) const { return data_.get_datum(i); }

///
/// @brief Return the outward adjacency list for vertex ``i``.
Expand All @@ -103,14 +113,17 @@ template <std::unsigned_integral Idx, data::MemoryDataset Data> class SimpleGrap
const_reference get_node(Idx i) const {
// Get the raw data.
std::span<const Idx> raw_data = data_.get_datum(i);
auto num_neighbors = raw_data.front();
Idx num_neighbors =
std::atomic_ref<const Idx>(raw_data.front()).load(std::memory_order_relaxed);
// Clamp to max_degree to safely handle torn reads of the length field.
num_neighbors = std::min(num_neighbors, max_degree_);

// Maybe prefetch the rest of the adjacncy list.
// Maybe prefetch the rest of the adjacency list.
size_t bytes = (1 + num_neighbors) * sizeof(Idx);
if (bytes > lib::CACHELINE_BYTES) {
lib::prefetch(std::as_bytes(raw_data).subspan(lib::CACHELINE_BYTES));
}
return raw_data.subspan(1, num_neighbors);
return AtomicSpan<const Idx>(raw_data.data() + 1, num_neighbors);
}

///
Expand All @@ -119,16 +132,28 @@ template <std::unsigned_integral Idx, data::MemoryDataset Data> class SimpleGrap
/// Complexity: Linear in the maximum degree.
///
bool has_edge(Idx src, Idx dst) const {
const auto& list = get_node(src);
auto begin = list.begin();
auto end = list.end();
return (std::find(begin, end, dst) != end);
for (;;) {
auto maybe_seq = seq_counters_[src].read_begin();
if (!maybe_seq) {
detail::pause();
continue;
}
const auto& list = get_node(src);
bool found = (std::find(list.begin(), list.end(), dst) != list.end());
if (seq_counters_[src].read_validate(*maybe_seq)) {
return found;
}
detail::pause();
}
}

///
/// @brief Return the current out degree of vertex ``i``.
///
size_t get_node_degree(Idx i) const { return data_.get_datum(i).front(); }
size_t get_node_degree(Idx i) const {
return std::atomic_ref<const Idx>(data_.get_datum(i).front())
.load(std::memory_order_relaxed);
}

///
/// @brief Prefetch the adjacency list for node ``i`` into the L1 cache.
Expand All @@ -144,8 +169,11 @@ template <std::unsigned_integral Idx, data::MemoryDataset Data> class SimpleGrap
/// The complexity of this operation is `O(1)`.
///
void clear_node(Idx i) {
Idx& num_neighbors = data_.get_datum(i).front();
num_neighbors = 0;
std::lock_guard lock{node_locks_[i]};
auto seq = seq_counters_[i].begin_write();
std::atomic_ref<Idx>(data_.get_datum(i).front())
.store(0, std::memory_order_relaxed);
seq_counters_[i].end_write(seq);
}

///
Expand Down Expand Up @@ -177,6 +205,7 @@ template <std::unsigned_integral Idx, data::MemoryDataset Data> class SimpleGrap

/// @copydoc replace_node(Idx,const std::vector<Idx>&)
void replace_node(Idx i, std::span<const Idx> new_neighbors) {
std::lock_guard lock{node_locks_[i]};
std::span<Idx> raw_data = data_.get_datum(i);

// Clamp the number of elements to copy to the maximum out degree to correctly
Expand All @@ -186,13 +215,31 @@ template <std::unsigned_integral Idx, data::MemoryDataset Data> class SimpleGrap
Idx elements_to_copy =
std::min(max_degree_, lib::narrow_cast<Idx>(new_neighbors.size()));

std::span<const Idx> adjusted_neighbors = new_neighbors.first(elements_to_copy);
value_type adjacency_list = raw_data.subspan(1, elements_to_copy);
auto seq = seq_counters_[i].begin_write();
for (Idx j = 0; j < elements_to_copy; ++j) {
std::atomic_ref<Idx>(raw_data[1 + j])
.store(new_neighbors[j], std::memory_order_relaxed);
}
std::atomic_ref<Idx>(raw_data[0])
.store(elements_to_copy, std::memory_order_relaxed);
seq_counters_[i].end_write(seq);
}

std::copy(
adjusted_neighbors.begin(), adjusted_neighbors.end(), adjacency_list.begin()
);
raw_data.front() = elements_to_copy;
/// @copydoc replace_node(Idx,const std::vector<Idx>&)
void replace_node(Idx i, AtomicSpan<const Idx> new_neighbors) {
std::lock_guard lock{node_locks_[i]};
std::span<Idx> raw_data = data_.get_datum(i);
Idx elements_to_copy =
std::min(max_degree_, lib::narrow_cast<Idx>(new_neighbors.size()));

auto seq = seq_counters_[i].begin_write();
for (Idx j = 0; j < elements_to_copy; ++j) {
std::atomic_ref<Idx>(raw_data[1 + j])
.store(new_neighbors[j], std::memory_order_relaxed);
}
std::atomic_ref<Idx>(raw_data[0])
.store(elements_to_copy, std::memory_order_relaxed);
seq_counters_[i].end_write(seq);
}

///
Expand All @@ -208,10 +255,10 @@ template <std::unsigned_integral Idx, data::MemoryDataset Data> class SimpleGrap
/// * ``get_node_degree(src) == max_degree()`` (adjacency list is already full)
/// * ``dst`` is already an out-neighbor of ``src``.
///
size_t add_edge(Idx src, Idx dst) {
AddEdgeResult add_edge(Idx src, Idx dst) {
// Don't assign a node as its own neighbor.
if (src == dst) {
return get_node_degree(src);
return AddEdgeResult::AlreadyExists;
}

if constexpr (checkbounds_v) {
Expand All @@ -225,11 +272,15 @@ template <std::unsigned_integral Idx, data::MemoryDataset Data> class SimpleGrap
}
}

// Acquire lock — all reads and writes under the lock to prevent
// concurrent writers from seeing stale state.
std::lock_guard lock{node_locks_[src]};

// Check if there's room for the new node.
std::span<Idx> raw_data = data_.get_datum(src);
Idx current_size = raw_data.front();
if (current_size == max_degree_) {
return current_size;
return AddEdgeResult::Full;
}

// At this point, we know there is room.
Expand All @@ -248,17 +299,22 @@ template <std::unsigned_integral Idx, data::MemoryDataset Data> class SimpleGrap
auto it = std::find(begin, end - 1, dst);
// auto it = std::lower_bound(begin, end - 1, dst);
if (it != end - 1 && (*it == dst)) {
return current_size;
return AddEdgeResult::AlreadyExists;
}

// Insert at the new location.
std::copy_backward(it, end - 1, end);
(*it) = dst;
auto seq = seq_counters_[src].begin_write();

// // Assign the new edge and update the number of neighbors.
// adjacency_list.back() = dst;
raw_data.front() = new_size;
return new_size;
// Insert at the new location using atomic stores.
for (auto dst_it = end - 1, src_it = end - 2; dst_it != it; --dst_it, --src_it) {
std::atomic_ref<Idx>(*dst_it).store(*src_it, std::memory_order_relaxed);
}
std::atomic_ref<Idx>(*it).store(dst, std::memory_order_relaxed);

// Update the number of neighbors.
std::atomic_ref<Idx>(raw_data.front()).store(new_size, std::memory_order_relaxed);

seq_counters_[src].end_write(seq);
return AddEdgeResult::Added;
}

/// Return the maximum out-degree this graph is capable of containing.
Expand All @@ -270,9 +326,16 @@ template <std::unsigned_integral Idx, data::MemoryDataset Data> class SimpleGrap
data_type& get_data() { return data_; }

// Resizeable API
void unsafe_resize(size_t new_size) { data_.resize(new_size); }
void unsafe_resize(size_t new_size) {
data_.resize(new_size);
seq_counters_.resize(new_size);
node_locks_.resize(new_size);
}
void add_node() { unsafe_resize(n_nodes() + 1); }

/// @brief Access the per-node sequence lock counters for concurrent read validation.
const SeqLockArray& seq_counters() const { return seq_counters_; }

///// Saving
static constexpr lib::Version save_version = lib::Version(0, 0, 0);
static constexpr std::string_view serialization_schema = "default_graph";
Expand Down Expand Up @@ -370,6 +433,8 @@ template <std::unsigned_integral Idx, data::MemoryDataset Data> class SimpleGrap
protected:
data_type data_;
Idx max_degree_;
SeqLockArray seq_counters_;
std::vector<SpinLock> node_locks_;
};

/////
Expand Down
Loading
Loading