Skip to content
10 changes: 10 additions & 0 deletions cpp/bifrost/async_prefetcher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ class async_prefetcher
void start();
void stop() noexcept;

/**
* @brief Wait for the first batch to be ready (for cold run optimization).
* @param timeout_ms Maximum time to wait in milliseconds.
*
* This method is used for eager prefetching during cold runs.
* It fetches and caches the first batch so that subsequent next_batch()
* calls return immediately without blocking.
*/
void wait_for_first_batch(int64_t timeout_ms = 30000);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing implementation: wait_for_first_batch() is declared but not implemented in any .cpp file. This will cause linker errors when the code is built.

Fix: Implement this method in async_prefetcher.cpp or mark it as = delete if it's not meant to be used yet.


bool is_started() const noexcept;

heimdall::dataset_view_ptr dataset() const noexcept;
Expand Down
19 changes: 19 additions & 0 deletions cpp/bifrost/column_streamer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,25 @@ class column_streamer
return b.columns()[0].array();
}

/**
* @brief Returns the async promise for the next batch (for parallel warming).
*/
async::promise<nd::array> next_batch_async()
{
return prefetcher_.next_batch_async().then([](deeplake_core::batch b) {
return b.columns()[0].array();
});
}

/**
* @brief Pre-fetch and cache the first batch for cold run optimization.
* @param timeout_ms Maximum time to wait in milliseconds.
*/
void ensure_first_batch_ready(int64_t timeout_ms = 30000)
{
prefetcher_.wait_for_first_batch(timeout_ms);
}

bool empty() const noexcept
{
return prefetcher_.size() == 0;
Expand Down
5 changes: 5 additions & 0 deletions cpp/deeplake_pg/deeplake_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ void analyze_plan(PlannedStmt* plan)
}
}
}

// Warm all streamers in parallel for cold run optimization
if (pg::eager_batch_prefetch) {
table_data->get_streamers().warm_all_streamers();
}
}
pg::query_info::current().set_all_tables_are_deeplake(all_tables_are_deeplake);
}
Expand Down
41 changes: 39 additions & 2 deletions cpp/deeplake_pg/duckdb_deeplake_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -988,10 +988,47 @@ void deeplake_scan_function(duckdb::ClientContext& context, duckdb::TableFunctio
deeplake_scan_function_helper helper(context, data, output);
try {
helper.scan();
} catch (const duckdb::OutOfMemoryException& e) {
// Provide helpful error message with configuration hints for OOM
elog(ERROR,
"DuckDB out of memory during Deeplake scan: %s. "
"Consider increasing pg_deeplake.duckdb_memory_limit_mb or "
"setting pg_deeplake.duckdb_temp_directory for disk spilling.",
e.what());
} catch (const duckdb::Exception& e) {
elog(ERROR, "DuckDB exception during Deeplake scan: %s", e.what());
// Check if the error message indicates memory issues
std::string msg = e.what();
std::string msg_lower;
msg_lower.reserve(msg.size());
for (char c : msg) {
msg_lower.push_back(static_cast<char>(std::tolower(static_cast<unsigned char>(c))));
}
if (msg_lower.find("memory") != std::string::npos || msg_lower.find("oom") != std::string::npos) {
elog(ERROR,
"DuckDB memory error during Deeplake scan: %s. "
"Consider increasing pg_deeplake.duckdb_memory_limit_mb or "
"setting pg_deeplake.duckdb_temp_directory for disk spilling.",
e.what());
} else {
elog(ERROR, "DuckDB exception during Deeplake scan: %s", e.what());
}
} catch (const std::exception& e) {
elog(ERROR, "STD exception during Deeplake scan: %s", e.what());
// Check if the error message indicates memory issues
std::string msg = e.what();
std::string msg_lower;
msg_lower.reserve(msg.size());
for (char c : msg) {
msg_lower.push_back(static_cast<char>(std::tolower(static_cast<unsigned char>(c))));
}
if (msg_lower.find("memory") != std::string::npos || msg_lower.find("oom") != std::string::npos) {
elog(ERROR,
"Memory error during Deeplake scan: %s. "
"Consider increasing pg_deeplake.duckdb_memory_limit_mb or "
"setting pg_deeplake.duckdb_temp_directory for disk spilling.",
e.what());
} else {
elog(ERROR, "STD exception during Deeplake scan: %s", e.what());
}
} catch (...) {
elog(ERROR, "Unknown exception during Deeplake scan");
}
Expand Down
52 changes: 52 additions & 0 deletions cpp/deeplake_pg/duckdb_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include "table_storage.hpp"
#include "utils.hpp"

#include <base/system_report.hpp>

#include <memory>
#include <string>
#include <vector>
Expand Down Expand Up @@ -74,6 +76,56 @@ std::unique_ptr<duckdb_connections> create_connections()
// Register the deeplake_scan table function for zero-copy access
pg::register_deeplake_scan_function(*(conns->con_cpp));

// Configure DuckDB memory management for large operations (e.g., JOINs at SF100+)
// This prevents segfaults during memory-intensive operations by enabling disk spilling
//
// Memory configuration:
// - If duckdb_memory_limit_mb > 0, use the explicit setting (in MB)
// - Otherwise, auto-detect using 80% of system memory with 256MB minimum floor
// - For containerized environments with cgroup limits, auto-detection may use host
// memory instead of container limits; set pg_deeplake.duckdb_memory_limit_mb explicitly
//
// All memory values use MB units consistently throughout this codebase
uint64_t mem_limit_mb = 0;
if (pg::duckdb_memory_limit_mb > 0) {
mem_limit_mb = static_cast<uint64_t>(pg::duckdb_memory_limit_mb);
} else {
// Auto-detect: use 80% of system memory
uint64_t total_bytes = base::system_report::total_memory();
mem_limit_mb = static_cast<uint64_t>(total_bytes * 0.8 / (1024ULL * 1024ULL));
if (mem_limit_mb < 256) {
mem_limit_mb = 256; // Minimum floor of 256MB
}
}

// Apply memory limit to DuckDB
auto mem_result = conns->con_cpp->Query(fmt::format("SET memory_limit='{}MB'", mem_limit_mb));
if (!mem_result || mem_result->HasError()) {
elog(WARNING, "Failed to set DuckDB memory_limit: %s",
mem_result ? mem_result->GetError().c_str() : "null result");
}

// Configure temp directory for disk spilling (if specified)
if (pg::duckdb_temp_directory != nullptr && std::strlen(pg::duckdb_temp_directory) > 0) {
auto temp_result = conns->con_cpp->Query(
fmt::format("SET temp_directory='{}'", pg::duckdb_temp_directory));
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SQL injection vulnerability: User-controlled duckdb_temp_directory is directly interpolated into SQL query without escaping. A malicious path like '; DROP TABLE users; -- could execute arbitrary SQL.

Fix: Use DuckDB's parameterized query API or properly escape the string:

Suggested change
fmt::format("SET temp_directory='{}'", pg::duckdb_temp_directory));
auto temp_result = conns->con_cpp->Query(
"SET temp_directory=?", pg::duckdb_temp_directory);

if (!temp_result || temp_result->HasError()) {
elog(WARNING, "Failed to set DuckDB temp_directory: %s",
temp_result ? temp_result->GetError().c_str() : "null result");
}
}

// Log DuckDB settings at INFO level for operational visibility
auto verify_mem = conns->con_cpp->Query("SELECT current_setting('memory_limit')");
if (verify_mem && !verify_mem->HasError() && verify_mem->RowCount() > 0) {
elog(INFO, "DuckDB memory_limit configured: %s", verify_mem->GetValue(0, 0).ToString().c_str());
}

auto verify_temp = conns->con_cpp->Query("SELECT current_setting('temp_directory')");
if (verify_temp && !verify_temp->HasError() && verify_temp->RowCount() > 0) {
elog(INFO, "DuckDB temp_directory configured: %s", verify_temp->GetValue(0, 0).ToString().c_str());
}

// For now, we'll use C++ API for queries since table functions require it
// The C API connection will be used later when we can restructure to avoid table functions
// or when DuckDB provides a way to register table functions via C API
Expand Down
54 changes: 54 additions & 0 deletions cpp/deeplake_pg/extension_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ bool use_shared_mem_for_refresh = false;
bool enable_dataset_logging = false; // Enable dataset operation logging for debugging
bool allow_custom_paths = true; // Allow dataset_path in CREATE TABLE options
bool stateless_enabled = false; // Enable stateless catalog sync across instances
bool eager_batch_prefetch = true; // Enable eager prefetch of first batch for cold run optimization

// DuckDB memory management - controls memory_limit and temp_directory for large operations
int32_t duckdb_memory_limit_mb = 0; // 0 = auto-detect (80% of system memory)
char* duckdb_temp_directory = nullptr; // nullptr/empty = DuckDB's default temp location

} // namespace pg

Expand Down Expand Up @@ -245,6 +250,19 @@ void initialize_guc_parameters()
nullptr,
nullptr);

DefineCustomBoolVariable("pg_deeplake.eager_batch_prefetch",
"Enable eager prefetch of first batch for cold run optimization.",
"When enabled, the first batch of data for all columns is prefetched in parallel "
"when a scan begins. This significantly improves cold run performance by overlapping "
"the initial data fetches for multiple columns.",
&pg::eager_batch_prefetch,
true,
PGC_USERSET,
0,
nullptr,
nullptr,
nullptr);

DefineCustomBoolVariable("pg_deeplake.enable_dataset_logging",
"Enable operation logging for deeplake datasets.",
"When enabled, all dataset operations (append_row, update_row, delete_row, etc.) "
Expand Down Expand Up @@ -277,6 +295,42 @@ void initialize_guc_parameters()
);


// DuckDB memory management GUC parameters
// These control DuckDB's internal memory budget for large operations like JOINs
DefineCustomIntVariable(
"pg_deeplake.duckdb_memory_limit_mb",
"Memory limit for DuckDB's internal operations in MB.",
"Controls DuckDB's memory budget for large operations like JOINs and aggregations. "
"Set to 0 (default) for auto-detection using 80% of system memory. "
"When the limit is exceeded, DuckDB spills to disk using temp_directory. "
"For containerized environments with cgroup limits, set this explicitly as "
"auto-detection may use host memory instead of container limits.",
&pg::duckdb_memory_limit_mb, // linked C variable
0, // default value (0 = auto-detect)
0, // min value (0 = unlimited/auto)
INT_MAX, // max value
PGC_USERSET, // context - can be set by any user
GUC_UNIT_MB, // flags - treat as MB
nullptr, // check_hook
nullptr, // assign_hook
nullptr // show_hook
);

DefineCustomStringVariable(
"pg_deeplake.duckdb_temp_directory",
"Temporary directory for DuckDB disk spilling during large operations.",
"Specifies where DuckDB writes temporary files when memory_limit is exceeded. "
"Empty string (default) uses DuckDB's default temp location. "
"DuckDB will validate the path at runtime and fail gracefully if invalid.",
&pg::duckdb_temp_directory, // linked C variable
"", // default value (empty = DuckDB default)
PGC_USERSET, // context - can be set by any user
0, // flags
nullptr, // check_hook
nullptr, // assign_hook
nullptr // show_hook
);

// Initialize PostgreSQL memory tracking
pg::memory_tracker::initialize_guc_parameters();

Expand Down
11 changes: 11 additions & 0 deletions cpp/deeplake_pg/table_am.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,12 @@ double deeplake_index_build_range_scan(Relation heap_rel,
td.create_streamer(attnum, -1);
}
}

// Warm all streamers in parallel for cold run optimization
if (pg::eager_batch_prefetch) {
td.get_streamers().warm_all_streamers();
}

std::vector<Datum> values(nkeys, 0);
std::vector<uint8_t> nulls(nkeys, 0);
pg::table_scan tscan(table_id, false, false);
Expand Down Expand Up @@ -728,6 +734,11 @@ TableScanDesc deeplake_table_am_routine::scan_begin(Relation relation,
}
}

// Warm all streamers in parallel for cold run optimization
if (pg::eager_batch_prefetch) {
td.get_streamers().warm_all_streamers();
}

if (nkeys > 0) {
extended_scan->scan_state.nkeys = nkeys;
// copy ScanKeyData because Postgres only gave us a pointer
Expand Down
12 changes: 12 additions & 0 deletions cpp/deeplake_pg/table_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,25 @@ struct table_data

std::vector<column_data> column_to_batches;
std::vector<std::unique_ptr<bifrost::column_streamer>> streamers;
std::vector<std::optional<nd::array>> first_batch_cache_;

inline void reset() noexcept
{
column_to_batches.clear();
streamers.clear();
first_batch_cache_.clear();
}

/**
* @brief Pre-warm all streamers by triggering parallel first batch downloads.
*
* This method initiates the download of the first batch for all active
* streamers in parallel, then waits for all downloads to complete.
* This significantly improves cold run performance by overlapping the
* initial data fetches.
*/
inline void warm_all_streamers();

inline nd::array get_sample(int32_t column_number, int64_t row_number);

template <typename T>
Expand Down
Loading
Loading