Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/atomdb/adapterdb/AdapterDB.cc
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ void AdapterDB::initialize(bool skip_atomdb_backend_empty) {
string type = config.at_path("adapterdb.type").get_or<string>("");
auto adapter_type = parse_adapter_db_type(type);

if (adapter_type != AdapterDbType::Postgres) {
if (adapter_type != AdapterDbType::Postgres && adapter_type != AdapterDbType::Mork) {
RAISE_ERROR("AdapterDB: Unsupported database type in config: " + type);
}

Expand Down
3 changes: 2 additions & 1 deletion src/atomdb/adapterdb/AdapterDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ using namespace db_adapter;

namespace atomdb {

enum class AdapterDbType { Postgres };
enum class AdapterDbType { Postgres, Mork };

inline AdapterDbType parse_adapter_db_type(const std::string& value) {
if (value == "postgres") return AdapterDbType::Postgres;
if (value == "mork") return AdapterDbType::Mork;
RAISE_ERROR("Unsupported adapterdb.type: " + value);
}

Expand Down
3 changes: 3 additions & 0 deletions src/db_adapter/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ cc_library(
":metta_file_writer",
"//commons:commons_lib",
"//commons/atoms:atoms_lib",
"//db_adapter/mork:mork_lib",
"//db_adapter/postgres:postgres_lib",
],
)
Expand All @@ -40,6 +41,7 @@ cc_library(
":database_types",
"//commons:commons_lib",
"//commons/atoms:atoms_lib",
"//metta:metta_lib",
],
)

Expand Down Expand Up @@ -96,6 +98,7 @@ cc_library(
"//atomdb",
"//commons:commons_lib",
"//commons/atoms:atoms_lib",
"//db_adapter/mork:mork_lib",
"//db_adapter/postgres:postgres_lib",
],
)
Expand Down
124 changes: 11 additions & 113 deletions src/db_adapter/ContextLoader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,121 +15,14 @@ using json = nlohmann::json;

namespace fs = std::filesystem;

vector<TableMapping> ContextLoader::load_table_file(const string& file_path) {
if (!fs::exists(file_path)) {
RAISE_ERROR("Context file " + file_path + " does not exist");
}

ifstream f(file_path);

json tables = json::parse(f);

vector<TableMapping> out;

bool has_error = false;

for (size_t i = 0; i < tables.size(); ++i) {
string msg_base = "table[" + to_string(i) + "]";

const json& table = tables[i];

TableMapping tm;

if (!table.contains("table_name")) {
LOG_ERROR(msg_base + " missing required key: 'table_name'");
has_error = true;
} else if (!table["table_name"].is_string()) {
LOG_ERROR(msg_base + ".table_name must be a string in a table entry");
has_error = true;
} else {
string tn = table["table_name"].get<string>();
size_t count_dot = 0;
for (char c : tn) {
if (c == '.') ++count_dot;
}
if (count_dot != 1) {
LOG_ERROR(msg_base + ".table_name must be in format 'schema.table'");
has_error = true;
} else {
size_t pos = tn.find('.');
if (pos == 0 || pos + 1 >= tn.size()) {
LOG_ERROR(msg_base + "table_name must be in format 'schema.table'");
has_error = true;
}
}
}

if (!table.contains("skip_columns")) {
LOG_ERROR(msg_base + " missing required key: 'skip_columns'");
has_error = true;
} else {
const json& sc = table["skip_columns"];
if (!sc.is_null()) {
if (!sc.is_array()) {
LOG_ERROR(msg_base +
".skip_columns must be an array of strings or null in a table entry");
has_error = true;
} else {
tm.skip_columns.emplace();
for (size_t k = 0; k < sc.size(); ++k) {
if (!sc[k].is_string()) {
LOG_ERROR(msg_base + ".skip_columns[" + to_string(k) +
"] must be a string in a table entry");
has_error = true;
} else {
tm.skip_columns->push_back(sc[k]);
}
}
}
}
}

if (!table.contains("where_clauses")) {
LOG_ERROR(msg_base + " missing required key: 'where_clauses'");
has_error = true;
} else {
const json& wc = table["where_clauses"];
if (!wc.is_null()) {
if (!wc.is_array()) {
LOG_ERROR(msg_base +
".where_clauses must be an array of strings or null in a table entry");
has_error = true;
} else {
tm.where_clauses.emplace();
for (size_t k = 0; k < wc.size(); ++k) {
if (!wc[k].is_string()) {
LOG_ERROR(msg_base + ".where_clauses[" + to_string(k) +
"] must be a string in a table entry");
has_error = true;
} else {
tm.where_clauses->push_back(wc[k]);
}
}
}
}
}

if (!has_error) {
tm.table_name = table["table_name"];
out.push_back(tm);
}
}

if (has_error) {
LOG_ERROR("Context file validation failed with errors. Please fix the issues and try again.");
return vector<TableMapping>{};
}
return out;
}

vector<string> ContextLoader::load_query_file(const string& file_path) {
vector<string> ContextLoader::load_sql_queries(const string& file_path) {
if (!fs::exists(file_path)) {
RAISE_ERROR("Query file " + file_path + " does not exist");
}

ifstream f(file_path);

vector<string> out;
vector<string> queries;
string query;
string line;

Expand All @@ -139,14 +32,19 @@ vector<string> ContextLoader::load_query_file(const string& file_path) {
if (!line.empty() && !is_comment) {
query += line + " ";
} else {
out.push_back(query);
queries.push_back(query);
query.clear();
}
}

if (!query.empty()) {
out.push_back(query);
queries.push_back(query);
}

return out;
}
return queries;
}

vector<string> ContextLoader::load_metta_queries(const string& file_path) {
RAISE_ERROR("ContextLoader::load_metta_queries() not implemented yet");
return {};
}
4 changes: 2 additions & 2 deletions src/db_adapter/ContextLoader.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ using namespace db_adapter;

class ContextLoader {
public:
static vector<TableMapping> load_table_file(const string& file_path);
static vector<string> load_query_file(const string& file_path);
static vector<string> load_sql_queries(const string& file_path);
static vector<string> load_metta_queries(const string& file_path);
};
20 changes: 20 additions & 0 deletions src/db_adapter/DatabaseMapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "Hasher.h"
#include "Link.h"
#include "MettaMapping.h"
#include "MettaParser.h"
#include "Node.h"

#define LOG_LEVEL INFO_LEVEL
Expand Down Expand Up @@ -313,3 +314,22 @@ void SQL2AtomsMapper::map_foreign_keys_combinations(
}
}
}

// ==============================
// Construction / destruction
// ==============================

Metta2AtomsMapper::Metta2AtomsMapper() {
RAISE_ERROR("Metta2AtomsMapper constructor not implemented yet");
}

Metta2AtomsMapper::~Metta2AtomsMapper() {}

// ==============================
// Public
// ==============================

const vector<Atom*> Metta2AtomsMapper::map(const DbInput& data) {
RAISE_ERROR("Metta2AtomsMapper::map() not implemented yet");
return vector<Atom*>{};
}
12 changes: 12 additions & 0 deletions src/db_adapter/DatabaseMapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "DatabaseTypes.h"
#include "HandleTrie.h"
#include "MettaMapping.h"
#include "MettaParserActions.h"

using namespace std;
using namespace atoms;
Expand Down Expand Up @@ -85,4 +86,15 @@ class SQL2AtomsMapper : public Mapper {
void map_foreign_keys_combinations(const vector<tuple<string, string, string>>& all_foreign_keys);
};

class Metta2AtomsMapper : public Mapper {
public:
Metta2AtomsMapper();
~Metta2AtomsMapper() override;

const vector<Atom*> map(const DbInput& data) override;

private:
vector<Atom*> atoms;
shared_ptr<MettaParserActions> parser_actions;
};
} // namespace db_adapter
36 changes: 34 additions & 2 deletions src/db_adapter/DatabaseOrchestrator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,24 @@ void DatabaseMappingOrchestrator::database_setup(const JsonConfig& config,
auto postgres_strategy = make_unique<PostgresMappingStrategy>(move(postgres_wrapper));
this->db_conn = move(conn);
this->strategy = move(postgres_strategy);
} else if (adapterdb_type == "mork") {
RAISE_ERROR("Mork adapter support is not implemented yet");
} else {
RAISE_ERROR("Unsupported adapter type: " + adapterdb_type);
}
}

// ==============================
// Construction / destruction
// ==============================

PostgresMappingStrategy::PostgresMappingStrategy(unique_ptr<PostgresWrapper> wrapper)
: wrapper(move(wrapper)) {}

// ==============================
// Public
// ============================

vector<MappingTask> PostgresMappingStrategy::create_tasks(const JsonConfig& config) {
vector<string> file_paths =
config.at_path("adapterdb.context_mapping_paths").get_or<vector<string>>({});
Expand All @@ -108,7 +118,7 @@ vector<MappingTask> PostgresMappingStrategy::create_tasks(const JsonConfig& conf
RAISE_ERROR("Unsupported mapping file type: " + ext + " for file: " + path);
}

auto queries_sql = ContextLoader::load_query_file(path);
auto queries_sql = ContextLoader::load_sql_queries(path);

if (!queries_sql.empty()) {
for (size_t i = 0; i < queries_sql.size(); i++) {
Expand Down Expand Up @@ -137,4 +147,26 @@ void PostgresMappingStrategy::execute_task(const MappingTask& task) {
} else {
this->wrapper->map_sql_query(task.task_name, task.context.value());
}
}
}

// ==============================
// Construction / destruction
// ==============================

MorkMappingStrategy::MorkMappingStrategy(unique_ptr<MorkWrapper> wrapper) {
RAISE_ERROR("MorkMappingStrategy constructor not implemented yet");
}

// ==============================
// Public
// ============================

vector<MappingTask> MorkMappingStrategy::create_tasks(const JsonConfig& config) {
RAISE_ERROR("MorkMappingStrategy::create_tasks() not implemented yet");
return vector<MappingTask>{};
}

void MorkMappingStrategy::execute_task(const MappingTask& task) {
RAISE_ERROR("MorkMappingStrategy::execute_task() not implemented yet");
return;
}
11 changes: 11 additions & 0 deletions src/db_adapter/DatabaseOrchestrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "BoundedSharedQueue.h"
#include "DedicatedThread.h"
#include "JsonConfig.h"
#include "MorkWrapper.h"
#include "PostgresWrapper.h"

using namespace atomdb;
Expand Down Expand Up @@ -61,4 +62,14 @@ class PostgresMappingStrategy : public DatabaseMappingStrategy {
unique_ptr<PostgresWrapper> wrapper;
};

class MorkMappingStrategy : public DatabaseMappingStrategy {
public:
explicit MorkMappingStrategy(unique_ptr<MorkWrapper> wrapper);
vector<MappingTask> create_tasks(const JsonConfig& config) override;
void execute_task(const MappingTask& task) override;

private:
unique_ptr<MorkWrapper> wrapper;
};

} // namespace db_adapter
11 changes: 8 additions & 3 deletions src/db_adapter/DatabaseTypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,18 @@ struct SqlRow {
size_t size() const { return (primary_key ? 1 : 0) + fields.size(); }
};

struct MettaExpression {
string expression;
};

struct NoSqlDocument {};

/**
* @typedef DbInput
* @brief A variant representing raw input from either SQL or NoSQL sources.
* @brief A variant representing raw input from the database, which can be a SQL row, a NoSQL document,
* or a Metta expression.
*/
using DbInput = variant<SqlRow, NoSqlDocument>;
using DbInput = variant<SqlRow, NoSqlDocument, MettaExpression>;

/**
* @struct Table
Expand All @@ -81,7 +86,7 @@ struct Table {
* @enum MAPPER_TYPE
* @brief Defines the strategy used to transform database rows.
*/
enum class MAPPER_TYPE { SQL2ATOMS };
enum class MAPPER_TYPE { SQL2ATOMS, METTA2ATOMS };

struct TableMapping {
string table_name;
Expand Down
5 changes: 5 additions & 0 deletions src/db_adapter/DatabaseWrapper.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#include "DatabaseWrapper.h"

using namespace std;
using namespace db_adapter;

DatabaseWrapper::DatabaseWrapper(DatabaseConnection& db_client, MAPPER_TYPE mapper_type)
: db_client(db_client), mapper(create_mapper(mapper_type)), mapper_type(mapper_type) {}

Expand All @@ -9,6 +12,8 @@ shared_ptr<Mapper> DatabaseWrapper::create_mapper(MAPPER_TYPE mapper_type) {
switch (mapper_type) {
case MAPPER_TYPE::SQL2ATOMS:
return make_shared<SQL2AtomsMapper>();
case MAPPER_TYPE::METTA2ATOMS:
return make_shared<Metta2AtomsMapper>();
default:
throw invalid_argument("Unknown mapper type");
}
Expand Down
Loading
Loading