Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/atomdb/adapterdb/AdapterDB.cc
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ void AdapterDB::initialize(bool skip_atomdb_backend_empty) {
string type = config.at_path("adapterdb.type").get_or<string>("");
auto adapter_type = parse_adapter_db_type(type);

if (adapter_type != AdapterDbType::Postgres) {
if (adapter_type != AdapterDbType::Postgres && adapter_type != AdapterDbType::Mork) {
RAISE_ERROR("AdapterDB: Unsupported database type in config: " + type);
}

Expand Down
5 changes: 3 additions & 2 deletions src/atomdb/adapterdb/AdapterDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ using namespace db_adapter;

namespace atomdb {

enum class AdapterDbType { Postgres };
enum class AdapterDbType { Postgres, Mork };

inline AdapterDbType parse_adapter_db_type(const std::string& value) {
if (value == "postgres") return AdapterDbType::Postgres;
if (value == "mork") return AdapterDbType::Mork;
RAISE_ERROR("Unsupported adapterdb.type: " + value);
}
};
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
};
}


class AdapterDB : public AtomDB {
public:
Expand Down
2 changes: 2 additions & 0 deletions src/db_adapter/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ cc_library(
":database_types",
"//commons:commons_lib",
"//commons/atoms:atoms_lib",
"//metta:metta_lib",
],
)

Expand Down Expand Up @@ -96,6 +97,7 @@ cc_library(
"//atomdb",
"//commons:commons_lib",
"//commons/atoms:atoms_lib",
"//db_adapter/mork:mork_lib",
"//db_adapter/postgres:postgres_lib",
],
)
Expand Down
124 changes: 11 additions & 113 deletions src/db_adapter/ContextLoader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,121 +15,14 @@ using json = nlohmann::json;

namespace fs = std::filesystem;

vector<TableMapping> ContextLoader::load_table_file(const string& file_path) {
if (!fs::exists(file_path)) {
RAISE_ERROR("Context file " + file_path + " does not exist");
}

ifstream f(file_path);

json tables = json::parse(f);

vector<TableMapping> out;

bool has_error = false;

for (size_t i = 0; i < tables.size(); ++i) {
string msg_base = "table[" + to_string(i) + "]";

const json& table = tables[i];

TableMapping tm;

if (!table.contains("table_name")) {
LOG_ERROR(msg_base + " missing required key: 'table_name'");
has_error = true;
} else if (!table["table_name"].is_string()) {
LOG_ERROR(msg_base + ".table_name must be a string in a table entry");
has_error = true;
} else {
string tn = table["table_name"].get<string>();
size_t count_dot = 0;
for (char c : tn) {
if (c == '.') ++count_dot;
}
if (count_dot != 1) {
LOG_ERROR(msg_base + ".table_name must be in format 'schema.table'");
has_error = true;
} else {
size_t pos = tn.find('.');
if (pos == 0 || pos + 1 >= tn.size()) {
LOG_ERROR(msg_base + "table_name must be in format 'schema.table'");
has_error = true;
}
}
}

if (!table.contains("skip_columns")) {
LOG_ERROR(msg_base + " missing required key: 'skip_columns'");
has_error = true;
} else {
const json& sc = table["skip_columns"];
if (!sc.is_null()) {
if (!sc.is_array()) {
LOG_ERROR(msg_base +
".skip_columns must be an array of strings or null in a table entry");
has_error = true;
} else {
tm.skip_columns.emplace();
for (size_t k = 0; k < sc.size(); ++k) {
if (!sc[k].is_string()) {
LOG_ERROR(msg_base + ".skip_columns[" + to_string(k) +
"] must be a string in a table entry");
has_error = true;
} else {
tm.skip_columns->push_back(sc[k]);
}
}
}
}
}

if (!table.contains("where_clauses")) {
LOG_ERROR(msg_base + " missing required key: 'where_clauses'");
has_error = true;
} else {
const json& wc = table["where_clauses"];
if (!wc.is_null()) {
if (!wc.is_array()) {
LOG_ERROR(msg_base +
".where_clauses must be an array of strings or null in a table entry");
has_error = true;
} else {
tm.where_clauses.emplace();
for (size_t k = 0; k < wc.size(); ++k) {
if (!wc[k].is_string()) {
LOG_ERROR(msg_base + ".where_clauses[" + to_string(k) +
"] must be a string in a table entry");
has_error = true;
} else {
tm.where_clauses->push_back(wc[k]);
}
}
}
}
}

if (!has_error) {
tm.table_name = table["table_name"];
out.push_back(tm);
}
}

if (has_error) {
LOG_ERROR("Context file validation failed with errors. Please fix the issues and try again.");
return vector<TableMapping>{};
}
return out;
}

vector<string> ContextLoader::load_query_file(const string& file_path) {
vector<string> ContextLoader::load_sql_queries(const string& file_path) {
if (!fs::exists(file_path)) {
RAISE_ERROR("Query file " + file_path + " does not exist");
}

ifstream f(file_path);

vector<string> out;
vector<string> queries;
string query;
string line;

Expand All @@ -139,14 +32,19 @@ vector<string> ContextLoader::load_query_file(const string& file_path) {
if (!line.empty() && !is_comment) {
query += line + " ";
} else {
out.push_back(query);
queries.push_back(query);
query.clear();
}
}

if (!query.empty()) {
out.push_back(query);
queries.push_back(query);
}

return out;
}
return queries;
}

vector<string> ContextLoader::load_metta_queries(const string& file_path) {
RAISE_ERROR("ContextLoader::load_metta_queries() not implemented yet");
return {};
}
4 changes: 2 additions & 2 deletions src/db_adapter/ContextLoader.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ using namespace db_adapter;

class ContextLoader {
public:
static vector<TableMapping> load_table_file(const string& file_path);
static vector<string> load_query_file(const string& file_path);
static vector<string> load_sql_queries(const string& file_path);
static vector<string> load_metta_queries(const string& file_path);
};
18 changes: 18 additions & 0 deletions src/db_adapter/DatabaseMapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "Hasher.h"
#include "Link.h"
#include "MettaMapping.h"
#include "MettaParser.h"
#include "Node.h"

#define LOG_LEVEL INFO_LEVEL
Expand Down Expand Up @@ -313,3 +314,20 @@ void SQL2AtomsMapper::map_foreign_keys_combinations(
}
}
}

// ==============================
// Construction / destruction
// ==============================

Mork2AtomsMapper::Mork2AtomsMapper() { RAISE_ERROR("Mork2AtomsMapper constructor not implemented yet"); }

Mork2AtomsMapper::~Mork2AtomsMapper() {}

// ==============================
// Public
// ==============================

const vector<Atom*> Mork2AtomsMapper::map(const DbInput& data) {
RAISE_ERROR("Mork2AtomsMapper::map() not implemented yet");
return vector<Atom*>{};
}
12 changes: 12 additions & 0 deletions src/db_adapter/DatabaseMapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "DatabaseTypes.h"
#include "HandleTrie.h"
#include "MettaMapping.h"
#include "MettaParserActions.h"

using namespace std;
using namespace atoms;
Expand Down Expand Up @@ -85,4 +86,15 @@ class SQL2AtomsMapper : public Mapper {
void map_foreign_keys_combinations(const vector<tuple<string, string, string>>& all_foreign_keys);
};

class Mork2AtomsMapper : public Mapper {
public:
Mork2AtomsMapper();
~Mork2AtomsMapper() override;

const vector<Atom*> map(const DbInput& data) override;

private:
vector<Atom*> atoms;
shared_ptr<MettaParserActions> parser_actions;
};
} // namespace db_adapter
36 changes: 34 additions & 2 deletions src/db_adapter/DatabaseOrchestrator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,24 @@ void DatabaseMappingOrchestrator::database_setup(const JsonConfig& config,
auto postgres_strategy = make_unique<PostgresMappingStrategy>(move(postgres_wrapper));
this->db_conn = move(conn);
this->strategy = move(postgres_strategy);
} else if (adapterdb_type == "mork") {
RAISE_ERROR("Mork adapter support is not implemented yet");
} else {
RAISE_ERROR("Unsupported adapter type: " + adapterdb_type);
}
}

// ==============================
// Construction / destruction
// ==============================

PostgresMappingStrategy::PostgresMappingStrategy(unique_ptr<PostgresWrapper> wrapper)
: wrapper(move(wrapper)) {}

// ==============================
// Public
// ============================

vector<MappingTask> PostgresMappingStrategy::create_tasks(const JsonConfig& config) {
vector<string> file_paths =
config.at_path("adapterdb.context_mapping_paths").get_or<vector<string>>({});
Expand All @@ -108,7 +118,7 @@ vector<MappingTask> PostgresMappingStrategy::create_tasks(const JsonConfig& conf
RAISE_ERROR("Unsupported mapping file type: " + ext + " for file: " + path);
}

auto queries_sql = ContextLoader::load_query_file(path);
auto queries_sql = ContextLoader::load_sql_queries(path);

if (!queries_sql.empty()) {
for (size_t i = 0; i < queries_sql.size(); i++) {
Expand Down Expand Up @@ -137,4 +147,26 @@ void PostgresMappingStrategy::execute_task(const MappingTask& task) {
} else {
this->wrapper->map_sql_query(task.task_name, task.context.value());
}
}
}

// ==============================
// Construction / destruction
// ==============================

MorkMappingStrategy::MorkMappingStrategy(unique_ptr<MorkWrapper> wrapper) {
RAISE_ERROR("MorkMappingStrategy constructor not implemented yet");
}

// ==============================
// Public
// ============================

vector<MappingTask> MorkMappingStrategy::create_tasks(const JsonConfig& config) {
RAISE_ERROR("MorkMappingStrategy::create_tasks() not implemented yet");
return vector<MappingTask>{};
}

void MorkMappingStrategy::execute_task(const MappingTask& task) {
RAISE_ERROR("MorkMappingStrategy::execute_task() not implemented yet");
return;
}
11 changes: 11 additions & 0 deletions src/db_adapter/DatabaseOrchestrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "BoundedSharedQueue.h"
#include "DedicatedThread.h"
#include "JsonConfig.h"
#include "MorkWrapper.h"
#include "PostgresWrapper.h"

using namespace atomdb;
Expand Down Expand Up @@ -61,4 +62,14 @@ class PostgresMappingStrategy : public DatabaseMappingStrategy {
unique_ptr<PostgresWrapper> wrapper;
};

class MorkMappingStrategy : public DatabaseMappingStrategy {
public:
explicit MorkMappingStrategy(unique_ptr<MorkWrapper> wrapper);
vector<MappingTask> create_tasks(const JsonConfig& config) override;
void execute_task(const MappingTask& task) override;

private:
unique_ptr<MorkWrapper> wrapper;
};

} // namespace db_adapter
4 changes: 2 additions & 2 deletions src/db_adapter/DatabaseTypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ struct NoSqlDocument {};
* @typedef DbInput
* @brief A variant representing raw input from either SQL or NoSQL sources.
*/
using DbInput = variant<SqlRow, NoSqlDocument>;
using DbInput = variant<SqlRow, NoSqlDocument, string>;

/**
* @struct Table
Expand All @@ -81,7 +81,7 @@ struct Table {
* @enum MAPPER_TYPE
* @brief Defines the strategy used to transform database rows.
*/
enum class MAPPER_TYPE { SQL2ATOMS };
enum class MAPPER_TYPE { SQL2ATOMS, MORK2ATOMS };

struct TableMapping {
string table_name;
Expand Down
2 changes: 2 additions & 0 deletions src/db_adapter/DatabaseWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ shared_ptr<Mapper> DatabaseWrapper::create_mapper(MAPPER_TYPE mapper_type) {
switch (mapper_type) {
case MAPPER_TYPE::SQL2ATOMS:
return make_shared<SQL2AtomsMapper>();
case MAPPER_TYPE::MORK2ATOMS:
return make_shared<Mork2AtomsMapper>();
default:
throw invalid_argument("Unknown mapper type");
}
Expand Down
31 changes: 31 additions & 0 deletions src/db_adapter/mork/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
load("@rules_cc//cc:cc_library.bzl", "cc_library")

package(default_visibility = ["//visibility:public"])

cc_library(
name = "mork_lib",
includes = ["."],
deps = [
":mork_wrapper",
],
)

cc_library(
name = "mork_wrapper",
srcs = ["MorkWrapper.cc"],
hdrs = ["MorkWrapper.h"],
includes = ["."],
linkopts = [
"-L/usr/local/lib",
"-lpqxx",
"-lpq",
],
deps = [
"//atomdb/morkdb:morkdb_lib",
"//commons:commons_lib",
"//db_adapter:bounded_shared_queue",
"//db_adapter:database_connection",
"//db_adapter:database_mapper",
"//db_adapter:database_wrapper",
],
)
Loading
Loading