diff --git a/src/atomdb/adapterdb/AdapterDB.cc b/src/atomdb/adapterdb/AdapterDB.cc index 9a42ec54..203e79ae 100644 --- a/src/atomdb/adapterdb/AdapterDB.cc +++ b/src/atomdb/adapterdb/AdapterDB.cc @@ -223,7 +223,7 @@ void AdapterDB::initialize(bool skip_atomdb_backend_empty) { string type = config.at_path("adapterdb.type").get_or(""); auto adapter_type = parse_adapter_db_type(type); - if (adapter_type != AdapterDbType::Postgres) { + if (adapter_type != AdapterDbType::Postgres && adapter_type != AdapterDbType::Mork) { RAISE_ERROR("AdapterDB: Unsupported database type in config: " + type); } diff --git a/src/atomdb/adapterdb/AdapterDB.h b/src/atomdb/adapterdb/AdapterDB.h index 6e995b65..50f27c1e 100644 --- a/src/atomdb/adapterdb/AdapterDB.h +++ b/src/atomdb/adapterdb/AdapterDB.h @@ -22,10 +22,11 @@ using namespace db_adapter; namespace atomdb { -enum class AdapterDbType { Postgres }; +enum class AdapterDbType { Postgres, Mork }; inline AdapterDbType parse_adapter_db_type(const std::string& value) { if (value == "postgres") return AdapterDbType::Postgres; + if (value == "mork") return AdapterDbType::Mork; RAISE_ERROR("Unsupported adapterdb.type: " + value); } diff --git a/src/db_adapter/BUILD b/src/db_adapter/BUILD index 30a585a9..ac01aa86 100644 --- a/src/db_adapter/BUILD +++ b/src/db_adapter/BUILD @@ -17,6 +17,7 @@ cc_library( ":metta_file_writer", "//commons:commons_lib", "//commons/atoms:atoms_lib", + "//db_adapter/mork:mork_lib", "//db_adapter/postgres:postgres_lib", ], ) @@ -40,6 +41,7 @@ cc_library( ":database_types", "//commons:commons_lib", "//commons/atoms:atoms_lib", + "//metta:metta_lib", ], ) @@ -96,6 +98,7 @@ cc_library( "//atomdb", "//commons:commons_lib", "//commons/atoms:atoms_lib", + "//db_adapter/mork:mork_lib", "//db_adapter/postgres:postgres_lib", ], ) diff --git a/src/db_adapter/ContextLoader.cc b/src/db_adapter/ContextLoader.cc index 0397cddf..9247314c 100644 --- a/src/db_adapter/ContextLoader.cc +++ b/src/db_adapter/ContextLoader.cc @@ -15,121 +15,14 @@ using json = nlohmann::json; namespace fs = std::filesystem; -vector ContextLoader::load_table_file(const string& file_path) { - if (!fs::exists(file_path)) { - RAISE_ERROR("Context file " + file_path + " does not exist"); - } - - ifstream f(file_path); - - json tables = json::parse(f); - - vector out; - - bool has_error = false; - - for (size_t i = 0; i < tables.size(); ++i) { - string msg_base = "table[" + to_string(i) + "]"; - - const json& table = tables[i]; - - TableMapping tm; - - if (!table.contains("table_name")) { - LOG_ERROR(msg_base + " missing required key: 'table_name'"); - has_error = true; - } else if (!table["table_name"].is_string()) { - LOG_ERROR(msg_base + ".table_name must be a string in a table entry"); - has_error = true; - } else { - string tn = table["table_name"].get(); - size_t count_dot = 0; - for (char c : tn) { - if (c == '.') ++count_dot; - } - if (count_dot != 1) { - LOG_ERROR(msg_base + ".table_name must be in format 'schema.table'"); - has_error = true; - } else { - size_t pos = tn.find('.'); - if (pos == 0 || pos + 1 >= tn.size()) { - LOG_ERROR(msg_base + "table_name must be in format 'schema.table'"); - has_error = true; - } - } - } - - if (!table.contains("skip_columns")) { - LOG_ERROR(msg_base + " missing required key: 'skip_columns'"); - has_error = true; - } else { - const json& sc = table["skip_columns"]; - if (!sc.is_null()) { - if (!sc.is_array()) { - LOG_ERROR(msg_base + - ".skip_columns must be an array of strings or null in a table entry"); - has_error = true; - } else { - tm.skip_columns.emplace(); - for (size_t k = 0; k < sc.size(); ++k) { - if (!sc[k].is_string()) { - LOG_ERROR(msg_base + ".skip_columns[" + to_string(k) + - "] must be a string in a table entry"); - has_error = true; - } else { - tm.skip_columns->push_back(sc[k]); - } - } - } - } - } - - if (!table.contains("where_clauses")) { - LOG_ERROR(msg_base + " missing required key: 'where_clauses'"); - has_error = true; - } else { - const json& wc = table["where_clauses"]; - if (!wc.is_null()) { - if (!wc.is_array()) { - LOG_ERROR(msg_base + - ".where_clauses must be an array of strings or null in a table entry"); - has_error = true; - } else { - tm.where_clauses.emplace(); - for (size_t k = 0; k < wc.size(); ++k) { - if (!wc[k].is_string()) { - LOG_ERROR(msg_base + ".where_clauses[" + to_string(k) + - "] must be a string in a table entry"); - has_error = true; - } else { - tm.where_clauses->push_back(wc[k]); - } - } - } - } - } - - if (!has_error) { - tm.table_name = table["table_name"]; - out.push_back(tm); - } - } - - if (has_error) { - LOG_ERROR("Context file validation failed with errors. Please fix the issues and try again."); - return vector{}; - } - return out; -} - -vector ContextLoader::load_query_file(const string& file_path) { +vector ContextLoader::load_sql_queries(const string& file_path) { if (!fs::exists(file_path)) { RAISE_ERROR("Query file " + file_path + " does not exist"); } ifstream f(file_path); - vector out; + vector queries; string query; string line; @@ -139,14 +32,19 @@ vector ContextLoader::load_query_file(const string& file_path) { if (!line.empty() && !is_comment) { query += line + " "; } else { - out.push_back(query); + queries.push_back(query); query.clear(); } } if (!query.empty()) { - out.push_back(query); + queries.push_back(query); } - return out; -} \ No newline at end of file + return queries; +} + +vector ContextLoader::load_metta_queries(const string& file_path) { + RAISE_ERROR("ContextLoader::load_metta_queries() not implemented yet"); + return {}; +} diff --git a/src/db_adapter/ContextLoader.h b/src/db_adapter/ContextLoader.h index a9f7063b..13a9eb50 100644 --- a/src/db_adapter/ContextLoader.h +++ b/src/db_adapter/ContextLoader.h @@ -8,6 +8,6 @@ using namespace db_adapter; class ContextLoader { public: - static vector load_table_file(const string& file_path); - static vector load_query_file(const string& file_path); + static vector load_sql_queries(const string& file_path); + static vector load_metta_queries(const string& file_path); }; \ No newline at end of file diff --git a/src/db_adapter/DatabaseMapper.cc b/src/db_adapter/DatabaseMapper.cc index 1d0a3d22..7fcf46dc 100644 --- a/src/db_adapter/DatabaseMapper.cc +++ b/src/db_adapter/DatabaseMapper.cc @@ -8,6 +8,7 @@ #include "Hasher.h" #include "Link.h" #include "MettaMapping.h" +#include "MettaParser.h" #include "Node.h" #define LOG_LEVEL INFO_LEVEL @@ -313,3 +314,22 @@ void SQL2AtomsMapper::map_foreign_keys_combinations( } } } + +// ============================== +// Construction / destruction +// ============================== + +Metta2AtomsMapper::Metta2AtomsMapper() { + RAISE_ERROR("Metta2AtomsMapper constructor not implemented yet"); +} + +Metta2AtomsMapper::~Metta2AtomsMapper() {} + +// ============================== +// Public +// ============================== + +const vector Metta2AtomsMapper::map(const DbInput& data) { + RAISE_ERROR("Metta2AtomsMapper::map() not implemented yet"); + return vector{}; +} \ No newline at end of file diff --git a/src/db_adapter/DatabaseMapper.h b/src/db_adapter/DatabaseMapper.h index 77659653..9b253380 100644 --- a/src/db_adapter/DatabaseMapper.h +++ b/src/db_adapter/DatabaseMapper.h @@ -10,6 +10,7 @@ #include "DatabaseTypes.h" #include "HandleTrie.h" #include "MettaMapping.h" +#include "MettaParserActions.h" using namespace std; using namespace atoms; @@ -85,4 +86,15 @@ class SQL2AtomsMapper : public Mapper { void map_foreign_keys_combinations(const vector>& all_foreign_keys); }; +class Metta2AtomsMapper : public Mapper { + public: + Metta2AtomsMapper(); + ~Metta2AtomsMapper() override; + + const vector map(const DbInput& data) override; + + private: + vector atoms; + shared_ptr parser_actions; +}; } // namespace db_adapter \ No newline at end of file diff --git a/src/db_adapter/DatabaseOrchestrator.cc b/src/db_adapter/DatabaseOrchestrator.cc index 9d646d42..ff6f0e96 100644 --- a/src/db_adapter/DatabaseOrchestrator.cc +++ b/src/db_adapter/DatabaseOrchestrator.cc @@ -80,14 +80,24 @@ void DatabaseMappingOrchestrator::database_setup(const JsonConfig& config, auto postgres_strategy = make_unique(move(postgres_wrapper)); this->db_conn = move(conn); this->strategy = move(postgres_strategy); + } else if (adapterdb_type == "mork") { + RAISE_ERROR("Mork adapter support is not implemented yet"); } else { RAISE_ERROR("Unsupported adapter type: " + adapterdb_type); } } +// ============================== +// Construction / destruction +// ============================== + PostgresMappingStrategy::PostgresMappingStrategy(unique_ptr wrapper) : wrapper(move(wrapper)) {} +// ============================== +// Public +// ============================ + vector PostgresMappingStrategy::create_tasks(const JsonConfig& config) { vector file_paths = config.at_path("adapterdb.context_mapping_paths").get_or>({}); @@ -108,7 +118,7 @@ vector PostgresMappingStrategy::create_tasks(const JsonConfig& conf RAISE_ERROR("Unsupported mapping file type: " + ext + " for file: " + path); } - auto queries_sql = ContextLoader::load_query_file(path); + auto queries_sql = ContextLoader::load_sql_queries(path); if (!queries_sql.empty()) { for (size_t i = 0; i < queries_sql.size(); i++) { @@ -137,4 +147,26 @@ void PostgresMappingStrategy::execute_task(const MappingTask& task) { } else { this->wrapper->map_sql_query(task.task_name, task.context.value()); } -} \ No newline at end of file +} + +// ============================== +// Construction / destruction +// ============================== + +MorkMappingStrategy::MorkMappingStrategy(unique_ptr wrapper) { + RAISE_ERROR("MorkMappingStrategy constructor not implemented yet"); +} + +// ============================== +// Public +// ============================ + +vector MorkMappingStrategy::create_tasks(const JsonConfig& config) { + RAISE_ERROR("MorkMappingStrategy::create_tasks() not implemented yet"); + return vector{}; +} + +void MorkMappingStrategy::execute_task(const MappingTask& task) { + RAISE_ERROR("MorkMappingStrategy::execute_task() not implemented yet"); + return; +} diff --git a/src/db_adapter/DatabaseOrchestrator.h b/src/db_adapter/DatabaseOrchestrator.h index 6aaeec6e..38100fb6 100644 --- a/src/db_adapter/DatabaseOrchestrator.h +++ b/src/db_adapter/DatabaseOrchestrator.h @@ -9,6 +9,7 @@ #include "BoundedSharedQueue.h" #include "DedicatedThread.h" #include "JsonConfig.h" +#include "MorkWrapper.h" #include "PostgresWrapper.h" using namespace atomdb; @@ -61,4 +62,14 @@ class PostgresMappingStrategy : public DatabaseMappingStrategy { unique_ptr wrapper; }; +class MorkMappingStrategy : public DatabaseMappingStrategy { + public: + explicit MorkMappingStrategy(unique_ptr wrapper); + vector create_tasks(const JsonConfig& config) override; + void execute_task(const MappingTask& task) override; + + private: + unique_ptr wrapper; +}; + } // namespace db_adapter diff --git a/src/db_adapter/DatabaseTypes.h b/src/db_adapter/DatabaseTypes.h index 2cbe1768..d8046dad 100644 --- a/src/db_adapter/DatabaseTypes.h +++ b/src/db_adapter/DatabaseTypes.h @@ -57,13 +57,18 @@ struct SqlRow { size_t size() const { return (primary_key ? 1 : 0) + fields.size(); } }; +struct MettaExpression { + string expression; +}; + struct NoSqlDocument {}; /** * @typedef DbInput - * @brief A variant representing raw input from either SQL or NoSQL sources. + * @brief A variant representing raw input from the database, which can be a SQL row, a NoSQL document, + * or a Metta expression. */ -using DbInput = variant; +using DbInput = variant; /** * @struct Table @@ -81,7 +86,7 @@ struct Table { * @enum MAPPER_TYPE * @brief Defines the strategy used to transform database rows. */ -enum class MAPPER_TYPE { SQL2ATOMS }; +enum class MAPPER_TYPE { SQL2ATOMS, METTA2ATOMS }; struct TableMapping { string table_name; diff --git a/src/db_adapter/DatabaseWrapper.cc b/src/db_adapter/DatabaseWrapper.cc index 5097ab54..00aec3ee 100644 --- a/src/db_adapter/DatabaseWrapper.cc +++ b/src/db_adapter/DatabaseWrapper.cc @@ -1,5 +1,8 @@ #include "DatabaseWrapper.h" +using namespace std; +using namespace db_adapter; + DatabaseWrapper::DatabaseWrapper(DatabaseConnection& db_client, MAPPER_TYPE mapper_type) : db_client(db_client), mapper(create_mapper(mapper_type)), mapper_type(mapper_type) {} @@ -9,6 +12,8 @@ shared_ptr DatabaseWrapper::create_mapper(MAPPER_TYPE mapper_type) { switch (mapper_type) { case MAPPER_TYPE::SQL2ATOMS: return make_shared(); + case MAPPER_TYPE::METTA2ATOMS: + return make_shared(); default: throw invalid_argument("Unknown mapper type"); } diff --git a/src/db_adapter/DatabaseWrapper.h b/src/db_adapter/DatabaseWrapper.h index 4c4ade84..39a67bc9 100644 --- a/src/db_adapter/DatabaseWrapper.h +++ b/src/db_adapter/DatabaseWrapper.h @@ -11,7 +11,6 @@ #include "DatabaseTypes.h" using namespace std; -using namespace db_adapter; using namespace commons; namespace db_adapter { diff --git a/src/db_adapter/mork/BUILD b/src/db_adapter/mork/BUILD new file mode 100644 index 00000000..b8313c2b --- /dev/null +++ b/src/db_adapter/mork/BUILD @@ -0,0 +1,38 @@ +load("@rules_cc//cc:cc_library.bzl", "cc_library") + +package(default_visibility = ["//visibility:public"]) + +cc_library( + name = "mork_lib", + includes = ["."], + deps = [ + ":mork_connection", + ":mork_wrapper", + ], +) + +cc_library( + name = "mork_wrapper", + srcs = ["MorkWrapper.cc"], + hdrs = ["MorkWrapper.h"], + includes = ["."], + deps = [ + ":mork_connection", + "//commons:commons_lib", + "//db_adapter:bounded_shared_queue", + "//db_adapter:database_types", + "//db_adapter:database_wrapper", + ], +) + +cc_library( + name = "mork_connection", + srcs = ["MorkConnection.cc"], + hdrs = ["MorkConnection.h"], + includes = ["."], + deps = [ + "//atomdb/morkdb:morkdb_lib", + "//commons:commons_lib", + "//db_adapter:database_connection", + ], +) diff --git a/src/db_adapter/mork/MorkConnection.cc b/src/db_adapter/mork/MorkConnection.cc new file mode 100644 index 00000000..f25a0a67 --- /dev/null +++ b/src/db_adapter/mork/MorkConnection.cc @@ -0,0 +1,37 @@ +#include "MorkConnection.h" + +#include "Utils.h" + +using namespace std; +using namespace atomdb; +using namespace db_adapter; + +// ============================== +// Construction / destruction +// ============================== + +MorkConnection::MorkConnection(const string& id, const string& host, int port) + : DatabaseConnection(id, host, port) { + RAISE_ERROR("MorkConnection constructor not implemented yet"); +} + +MorkConnection::~MorkConnection() {} + +// ============================== +// Public +// ============================== + +void MorkConnection::connect() { + RAISE_ERROR("MorkConnection::connect() not implemented yet"); + return; +} + +void MorkConnection::disconnect() { + RAISE_ERROR("MorkConnection::disconnect() not implemented yet"); + return; +} + +vector MorkConnection::query(const string& metta_query) { + RAISE_ERROR("MorkConnection::query() not implemented yet"); + return {}; +} \ No newline at end of file diff --git a/src/db_adapter/mork/MorkConnection.h b/src/db_adapter/mork/MorkConnection.h new file mode 100644 index 00000000..0e93c494 --- /dev/null +++ b/src/db_adapter/mork/MorkConnection.h @@ -0,0 +1,29 @@ +#pragma once + +#include +#include +#include + +#include "DatabaseConnection.h" +#include "MorkDB.h" + +using namespace std; +using namespace atomdb; + +namespace db_adapter { + +class MorkConnection : public DatabaseConnection { + public: + MorkConnection(const string& id, const string& host, int port); + ~MorkConnection(); + + void connect() override; + void disconnect() override; + + vector query(const string& metta_query); + + protected: + unique_ptr conn; +}; + +} // namespace db_adapter diff --git a/src/db_adapter/mork/MorkWrapper.cc b/src/db_adapter/mork/MorkWrapper.cc new file mode 100644 index 00000000..3b468d70 --- /dev/null +++ b/src/db_adapter/mork/MorkWrapper.cc @@ -0,0 +1,28 @@ +#include "MorkWrapper.h" + +#include "MorkConnection.h" +#include "Utils.h" + +#define LOG_LEVEL INFO_LEVEL +#include "Logger.h" + +using namespace std; +using namespace db_adapter; + +// ============================== +// Construction / destruction +// ============================== + +MorkWrapper::MorkWrapper(MorkConnection& conn, + shared_ptr output_queue, + MAPPER_TYPE mapper_type) + : DatabaseWrapper(conn, mapper_type), conn(conn), output_queue(output_queue) { + RAISE_ERROR("MorkWrapper constructor not implemented yet"); +} + +MorkWrapper::~MorkWrapper() {} + +void MorkWrapper::map(const string& metta_query) { + RAISE_ERROR("MorkWrapper::map() not implemented yet"); + return; +} diff --git a/src/db_adapter/mork/MorkWrapper.h b/src/db_adapter/mork/MorkWrapper.h new file mode 100644 index 00000000..25a2b4d3 --- /dev/null +++ b/src/db_adapter/mork/MorkWrapper.h @@ -0,0 +1,29 @@ +#pragma once + +#include +#include + +#include "BoundedSharedQueue.h" +#include "DatabaseTypes.h" +#include "DatabaseWrapper.h" +#include "MorkConnection.h" + +using namespace std; + +namespace db_adapter { + +class MorkWrapper : public DatabaseWrapper { + public: + MorkWrapper(MorkConnection& conn, + shared_ptr output_queue, + MAPPER_TYPE mapper_type = MAPPER_TYPE::METTA2ATOMS); + ~MorkWrapper(); + + void map(const string& metta_query); + + private: + MorkConnection& conn; + shared_ptr output_queue; +}; + +} // namespace db_adapter diff --git a/src/db_adapter/postgres/PostgresWrapper.cc b/src/db_adapter/postgres/PostgresWrapper.cc index 9c33c534..ad6d0be9 100644 --- a/src/db_adapter/postgres/PostgresWrapper.cc +++ b/src/db_adapter/postgres/PostgresWrapper.cc @@ -14,6 +14,7 @@ #include "Logger.h" using namespace std; +using namespace db_adapter; // ============================== // Construction / destruction