From 48772c0467d32647dbfcfe2bb0530885ad9dd612 Mon Sep 17 00:00:00 2001 From: Aashutosh-cognite Date: Tue, 26 May 2026 14:52:19 +0530 Subject: [PATCH] Add cdf_ingestion_foundation module for Foundation DP Orchestrates two-phase ingestion workflow (population + contextualization) with config-driven task generation via build_workflow.py and configure_datamodel.py. Rebased onto foundational-dp-cleanup so this PR contains only the ingestion foundation module. Co-authored-by: Cursor --- .../common/cdf_ingestion_foundation/README.md | 239 ++++++++++++ .../auth/grp_workflow.Group.yaml | 26 ++ .../auth/grp_workflow_user.Group.yaml | 18 + .../default.config.yaml | 59 +++ .../cdf_ingestion_foundation/module.toml | 4 + .../scripts/_pack_config.py | 189 ++++++++++ .../scripts/build_workflow.py | 204 +++++++++++ .../scripts/configure_datamodel.py | 344 ++++++++++++++++++ .../sap_equipment_to_asset.Transformation.sql | 2 + ...sap_equipment_to_asset.Transformation.yaml | 13 + .../sap_operation_to_order.Transformation.sql | 2 + ...sap_operation_to_order.Transformation.yaml | 13 + .../opcua_timeseries.Transformation.sql | 2 + .../opcua_timeseries.Transformation.yaml | 13 + .../pi_timeseries.Transformation.sql | 2 + .../pi_timeseries.Transformation.yaml | 13 + .../population/sap_assets.Transformation.sql | 2 + .../population/sap_assets.Transformation.yaml | 13 + .../sap_equipment.Transformation.sql | 2 + .../sap_equipment.Transformation.yaml | 13 + .../sap_maintenance_orders.Transformation.sql | 2 + ...sap_maintenance_orders.Transformation.yaml | 13 + .../sap_operations.Transformation.sql | 2 + .../sap_operations.Transformation.yaml | 13 + ...acturing_extension.equipment_to_asset.yaml | 14 + ...acturing_extension.operation_to_order.yaml | 14 + .../tasks/task.opcua_timeseries.yaml | 11 + .../tasks/task.pi_timeseries.yaml | 11 + .../tasks/task.sap_assets.yaml | 11 + .../tasks/task.sap_equipment.yaml | 13 + .../tasks/task.sap_maintenance_orders.yaml | 13 + .../tasks/task.sap_operations.yaml | 13 + .../workflows/wf_ingestion.Workflow.yaml | 4 + .../wf_ingestion_trigger.WorkflowTrigger.yaml | 9 + .../wf_ingestion_v1.WorkflowVersion.yaml | 100 +++++ 35 files changed, 1416 insertions(+) create mode 100644 modules/common/cdf_ingestion_foundation/README.md create mode 100644 modules/common/cdf_ingestion_foundation/auth/grp_workflow.Group.yaml create mode 100644 modules/common/cdf_ingestion_foundation/auth/grp_workflow_user.Group.yaml create mode 100644 modules/common/cdf_ingestion_foundation/default.config.yaml create mode 100644 modules/common/cdf_ingestion_foundation/module.toml create mode 100644 modules/common/cdf_ingestion_foundation/scripts/_pack_config.py create mode 100644 modules/common/cdf_ingestion_foundation/scripts/build_workflow.py create mode 100644 modules/common/cdf_ingestion_foundation/scripts/configure_datamodel.py create mode 100644 modules/common/cdf_ingestion_foundation/transformations/contextualization/sap_equipment_to_asset.Transformation.sql create mode 100644 modules/common/cdf_ingestion_foundation/transformations/contextualization/sap_equipment_to_asset.Transformation.yaml create mode 100644 modules/common/cdf_ingestion_foundation/transformations/contextualization/sap_operation_to_order.Transformation.sql create mode 100644 modules/common/cdf_ingestion_foundation/transformations/contextualization/sap_operation_to_order.Transformation.yaml create mode 100644 modules/common/cdf_ingestion_foundation/transformations/population/opcua_timeseries.Transformation.sql create mode 100644 modules/common/cdf_ingestion_foundation/transformations/population/opcua_timeseries.Transformation.yaml create mode 100644 modules/common/cdf_ingestion_foundation/transformations/population/pi_timeseries.Transformation.sql create mode 100644 modules/common/cdf_ingestion_foundation/transformations/population/pi_timeseries.Transformation.yaml create mode 100644 modules/common/cdf_ingestion_foundation/transformations/population/sap_assets.Transformation.sql create mode 100644 modules/common/cdf_ingestion_foundation/transformations/population/sap_assets.Transformation.yaml create mode 100644 modules/common/cdf_ingestion_foundation/transformations/population/sap_equipment.Transformation.sql create mode 100644 modules/common/cdf_ingestion_foundation/transformations/population/sap_equipment.Transformation.yaml create mode 100644 modules/common/cdf_ingestion_foundation/transformations/population/sap_maintenance_orders.Transformation.sql create mode 100644 modules/common/cdf_ingestion_foundation/transformations/population/sap_maintenance_orders.Transformation.yaml create mode 100644 modules/common/cdf_ingestion_foundation/transformations/population/sap_operations.Transformation.sql create mode 100644 modules/common/cdf_ingestion_foundation/transformations/population/sap_operations.Transformation.yaml create mode 100644 modules/common/cdf_ingestion_foundation/workflow_template/tasks/ctx.isa_manufacturing_extension.equipment_to_asset.yaml create mode 100644 modules/common/cdf_ingestion_foundation/workflow_template/tasks/ctx.isa_manufacturing_extension.operation_to_order.yaml create mode 100644 modules/common/cdf_ingestion_foundation/workflow_template/tasks/task.opcua_timeseries.yaml create mode 100644 modules/common/cdf_ingestion_foundation/workflow_template/tasks/task.pi_timeseries.yaml create mode 100644 modules/common/cdf_ingestion_foundation/workflow_template/tasks/task.sap_assets.yaml create mode 100644 modules/common/cdf_ingestion_foundation/workflow_template/tasks/task.sap_equipment.yaml create mode 100644 modules/common/cdf_ingestion_foundation/workflow_template/tasks/task.sap_maintenance_orders.yaml create mode 100644 modules/common/cdf_ingestion_foundation/workflow_template/tasks/task.sap_operations.yaml create mode 100644 modules/common/cdf_ingestion_foundation/workflows/wf_ingestion.Workflow.yaml create mode 100644 modules/common/cdf_ingestion_foundation/workflows/wf_ingestion_trigger.WorkflowTrigger.yaml create mode 100644 modules/common/cdf_ingestion_foundation/workflows/wf_ingestion_v1.WorkflowVersion.yaml diff --git a/modules/common/cdf_ingestion_foundation/README.md b/modules/common/cdf_ingestion_foundation/README.md new file mode 100644 index 00000000..76e3ed50 --- /dev/null +++ b/modules/common/cdf_ingestion_foundation/README.md @@ -0,0 +1,239 @@ +# CDF Ingestion Foundation Module + +Orchestrates the full ingestion workflow across all deployed source system modules. Assembles a two-phase CDF Workflow from per-task snippets driven by `config..yaml` — no manual WorkflowVersion editing when enabling or disabling sources. + +## Module Architecture + +``` +cdf_ingestion_foundation/ +├── auth/ +│ ├── grp_workflow.Group.yaml # Service account for workflow execution +│ └── grp_workflow_user.Group.yaml # Users who can trigger/monitor workflows +├── workflows/ +│ ├── wf_ingestion.Workflow.yaml # Workflow resource (singleton) +│ ├── wf_ingestion_trigger.WorkflowTrigger.yaml # Cron schedule trigger +│ └── wf_ingestion_v1.WorkflowVersion.yaml # GENERATED — do not edit by hand +├── workflow_template/ +│ └── tasks/ # One YAML snippet per workflow task +│ ├── task.pi_timeseries.yaml +│ ├── task.opcua_timeseries.yaml +│ ├── task.sap_assets.yaml +│ ├── task.sap_equipment.yaml +│ ├── task.sap_maintenance_orders.yaml +│ ├── task.sap_operations.yaml +│ ├── ctx.isa_manufacturing_extension.equipment_to_asset.yaml +│ └── ctx.isa_manufacturing_extension.operation_to_order.yaml +├── transformations/ +│ ├── population/ # Phase 1: source → DM instance transformations +│ │ ├── pi_timeseries.Transformation.yaml +│ │ ├── pi_timeseries.Transformation.sql # TODO: populate with SQL +│ │ ├── opcua_timeseries.Transformation.yaml +│ │ ├── opcua_timeseries.Transformation.sql # TODO: populate with SQL +│ │ ├── sap_assets.Transformation.yaml +│ │ ├── sap_assets.Transformation.sql # TODO: populate with SQL +│ │ ├── sap_equipment.Transformation.yaml +│ │ ├── sap_equipment.Transformation.sql # TODO: populate with SQL +│ │ ├── sap_maintenance_orders.Transformation.yaml +│ │ ├── sap_maintenance_orders.Transformation.sql # TODO: populate with SQL +│ │ ├── sap_operations.Transformation.yaml +│ │ └── sap_operations.Transformation.sql # TODO: populate with SQL +│ └── contextualization/ # Phase 2: edge / relationship transformations +│ ├── sap_equipment_to_asset.Transformation.yaml +│ ├── sap_equipment_to_asset.Transformation.sql # TODO: populate with SQL +│ ├── sap_operation_to_order.Transformation.yaml +│ └── sap_operation_to_order.Transformation.sql # TODO: populate with SQL +├── scripts/ +│ ├── _pack_config.py # Shared config / path helpers +│ ├── configure_datamodel.py # 1) Detect model under modules/data_models/, sync config..yaml +│ └── build_workflow.py # 2) Generate WorkflowVersion from config..yaml +└── module.toml +``` + +## Workflow Design + +The ingestion workflow runs in two phases: + +**Phase 1 — Population**: Source system transformations populate DM instances from RAW data. All population tasks can run in parallel (except where RAW data has inherent ordering, e.g., SAP assets must exist before equipment). + +**Phase 2 — Contextualization**: Relationship transformations (`ctx.*` tasks) set edges between DM instances. These run after all population tasks they depend on have completed. + +``` +[task_pi_timeseries] [task_sap_assets] + │ + ┌──────────────┼──────────────┐ + ▼ ▼ ▼ + [task_sap_equipment] [task_sap_maintenance_orders] + │ │ + │ [task_sap_operations] + │ │ + ▼ ▼ + [ctx_isa_equipment_to_asset] [ctx_isa_operation_to_order] +``` + +## Transformations + +Transformation definitions live in `transformations/population/` (Phase 1) and `transformations/contextualization/` (Phase 2). Each transformation has a `.yaml` resource file (external ID, destination view, schedule) and a companion `.sql` file. + +**The SQL files are scaffolds — they contain a placeholder comment only.** Populate each `.sql` with the actual query for your source system before deploying. + +| File | Source | Destination view | +|---|---|---| +| `pi_timeseries.sql` | `timeseries()` filtered by `{{piDataset}}` | `ISATimeSeries` | +| `opcua_timeseries.sql` | `db_{{location}}_opcua.timeseries` (RAW) | `ISATimeSeries` | +| `sap_assets.sql` | `db_{{location}}_sap.functional_location` (RAW) | `ISAAsset` | +| `sap_equipment.sql` | `db_{{location}}_sap.equipment` (RAW) | `Equipment` | +| `sap_maintenance_orders.sql` | `db_{{location}}_sap.workorder` (RAW) | `WorkOrder` | +| `sap_operations.sql` | `db_{{location}}_sap.workitem` (RAW) | `Operation` | +| `sap_equipment_to_asset.sql` | equipment → functional_location join on `Floc` | `Equipment.asset` edge | +| `sap_operation_to_order.sql` | workitem → workorder join on `OrderId` | `Operation.workOrder` edge | + +## Setup scripts (run in order) + +### 1. Configure data model — `configure_datamodel.py` + +Detects: + +1. **Data model** under `modules/data_models/` (`isa_manufacturing_extension` or `cfihos_oil_and_gas_extension`) +2. **Source systems** under `modules/sourcesystem/` — sets `enabledSources` from installed module folders: + +| Module directory | `enabledSources` key | Ingestion workflow tasks | +|---|---|---| +| `cdf_pi_foundation` | `pi` | `task_pi_timeseries` | +| `cdf_opcua_foundation` | `opcua` | `task_opcua_timeseries` | +| `cdf_sap_foundation` | `sap` | SAP population + ISA relationship tasks | +| `cdf_db_foundation` | `db` | None (DB extractor → RAW only) | +| `cdf_files_foundation` | `files` | None (Files extractor → CDF Files only) | + +Updates every discovered `config..yaml` with **contextualization**, **sourcesystem** (installed modules only), **common.cdf_ingestion_foundation** (`dataModelVariant`, `enabledSources`, …), and **data_models**. + +```bash +cd modules/common/cdf_ingestion_foundation +python3 scripts/configure_datamodel.py -y +python3 scripts/configure_datamodel.py --check # CI +``` + +### 2. Build workflow — `build_workflow.py` + +Reads `variables.modules.common.cdf_ingestion_foundation` from `config..yaml` (default env from `cdf.toml` `default_env`, or `--env`). + +```bash +python3 scripts/build_workflow.py +python3 scripts/build_workflow.py --env prod +python3 scripts/build_workflow.py --check +``` + +The script: +1. Reads `enabledSources`, `dataModelVariant`, and `enabledContextualization` from `config..yaml` +2. Selects task snippets from `workflow_template/tasks/` +3. Validates all `dependsOn` references exist in the included task set +4. Writes `workflows/wf_ingestion_v1.WorkflowVersion.yaml` + +## Configuration + +```yaml +# default.config.yaml +workflow: "wf_{{location}}_ingestion" +workflowSchedule: "0 2 * * *" # Daily 02:00 UTC + +# Workflow service account credentials +workflowClientId: "${IDP_CLIENT_ID}" +workflowClientSecret: "${IDP_CLIENT_SECRET}" + +# IDP group source IDs +workflowGroupSourceId: "" # Service account group +workflowUserGroupSourceId: "" # User group (read-only monitoring) + +# Toggle which source systems are included in the workflow +enabledSources: + pi: true + opcua: false + sap: true + +# Contextualization tasks +enabledContextualization: + isaRelationships: true # equipment_to_asset, operation_to_order (isa_manufacturing_extension) + connectionSql: false # P1: enable only with qs_enterprise DM variant + +dataModelVariant: isa_manufacturing_extension +``` + +After changing `enabledSources` in `config..yaml`, re-run `build_workflow.py` and commit `wf_ingestion_v1.WorkflowVersion.yaml`. After adding or switching a data model under `modules/data_models/`, re-run `configure_datamodel.py` first. + +Example of variables written for the ISA variant: + +```yaml +variables: + modules: + common: + cdf_ingestion_foundation: + dataModelVariant: isa_manufacturing_extension + isaSchemaSpace: sp_isa_manufacturing + instanceSpace: sp_isa_instance_space + contextualization: + cdf_entity_matching: + schemaSpace: sp_isa_manufacturing + AssetViewExternalId: ISAAsset + # ... + sourcesystem: + cdf_pi_foundation: + instanceSpace: sp_isa_instance_space +``` + +A timestamped backup (`config..yaml.bak.`) is created before each `configure_datamodel.py` write. + +## Resources Created + +| Resource | External ID | Purpose | +|---|---|---| +| Group | `grp_{{location}}_workflow` | Workflow execution service account | +| Group | `grp_{{location}}_workflow_user` | Workflow monitoring users | +| Workflow | `wf_{{location}}_ingestion` | Workflow resource (holds versions) | +| WorkflowTrigger | `wf_{{location}}_ingestion_trigger` | Cron schedule | +| WorkflowVersion | `wf_{{location}}_ingestion / v1` | Task DAG definition | + +## Task Snippets + +Each file in `workflow_template/tasks/` defines one workflow task. To add a new task: + +1. Create a new `.yaml` snippet following the existing pattern +2. Add it to `resolve_task_filenames()` in `build_workflow.py` with the appropriate condition +3. Run `python scripts/build_workflow.py` to regenerate the WorkflowVersion + +Task snippet format: +```yaml +externalId: task_my_task +type: transformation +name: "My transformation task" +description: "..." +dependsOn: # omit if no dependencies + - externalId: task_other_task +parameters: + transformation: + externalId: "tr_{{location}}_my_transformation" + concurrencyPolicy: fail +retries: 1 +timeout: 1800 +onFailure: abortWorkflow +``` + +## Dependencies + +**Depends on**: +- One or more source system modules matching `enabledSources`: + - `sourcesystem/cdf_pi_foundation` (if `pi: true`) + - `sourcesystem/cdf_opcua_foundation` (if `opcua: true`) + - `sourcesystem/cdf_sap_foundation` (if `sap: true`) + +**Package**: `dp:foundation` + +## Deploy + +```bash +cdf deploy modules/common/cdf_ingestion_foundation --env your-environment +``` + +After deploying, the workflow runs on the configured cron schedule. To trigger manually: + +```bash +cdf workflows run wf_{{location}}_ingestion --version v1 --env your-environment +``` diff --git a/modules/common/cdf_ingestion_foundation/auth/grp_workflow.Group.yaml b/modules/common/cdf_ingestion_foundation/auth/grp_workflow.Group.yaml new file mode 100644 index 00000000..8b5e92c0 --- /dev/null +++ b/modules/common/cdf_ingestion_foundation/auth/grp_workflow.Group.yaml @@ -0,0 +1,26 @@ +name: "grp_{{location}}_workflow" +sourceId: "{{workflowGroupSourceId}}" +metadata: + origin: cdf-project-templates +capabilities: + - projectsAcl: + actions: [READ, LIST] + scope: + all: {} + - sessionsAcl: + actions: [CREATE] + scope: + all: {} + - workflowOrchestrationAcl: + actions: [READ, WRITE] + scope: + datasetScope: + ids: ["{{dataset}}"] + - transformationsAcl: + actions: [READ, RUN] + scope: + all: {} + - datasetsAcl: + actions: [READ] + scope: + all: {} diff --git a/modules/common/cdf_ingestion_foundation/auth/grp_workflow_user.Group.yaml b/modules/common/cdf_ingestion_foundation/auth/grp_workflow_user.Group.yaml new file mode 100644 index 00000000..a49f9ee7 --- /dev/null +++ b/modules/common/cdf_ingestion_foundation/auth/grp_workflow_user.Group.yaml @@ -0,0 +1,18 @@ +name: "grp_{{location}}_workflow_user" +sourceId: "{{workflowUserGroupSourceId}}" +metadata: + origin: cdf-project-templates +capabilities: + - projectsAcl: + actions: [READ, LIST] + scope: + all: {} + - workflowOrchestrationAcl: + actions: [READ, WRITE] + scope: + datasetScope: + ids: ["{{dataset}}"] + - datasetsAcl: + actions: [READ] + scope: + all: {} diff --git a/modules/common/cdf_ingestion_foundation/default.config.yaml b/modules/common/cdf_ingestion_foundation/default.config.yaml new file mode 100644 index 00000000..8552aca2 --- /dev/null +++ b/modules/common/cdf_ingestion_foundation/default.config.yaml @@ -0,0 +1,59 @@ +# CDF Ingestion Foundation — configuration variables + +# Site/location code — used in all externalIds (e.g. wf_{{location}}_ingestion, tr_pi_timeseries_{{location}}_to_isa) +# Must match the location value set in all deployed source system modules +location: "site1" + +# Dataset used to scope workflow group ACLs — must match the dataset used by source system modules +dataset: "ds_ingestion" + +# ── Data model destination ──────────────────────────────────────────────────── +# Schema space where ISA Manufacturing Extension views are defined. +# Must match isaSchemaSpace in the isa_manufacturing_extension model module. +isaSchemaSpace: "sp_isa_manufacturing" + +# Space where DM instances (ISAAsset, Equipment, WorkOrder, …) are written. +# Must match the instanceSpace used by the source system modules. +instanceSpace: "sp_isa_instance_space" + +# Version of the ISA Manufacturing Extension views (e.g. v1). +viewVersion: "v1" + +# ── Source dataset references ───────────────────────────────────────────────── +# dataSetExternalId used by cdf_pi_foundation — filters the timeseries() read +piDataset: "ds_pi" + +# dataSetExternalId used by cdf_opcua_foundation — filters the timeseries() read +opcuaDataset: "ds_opcua" + +# sysTagsFound is not a property on ISATimeSeries or CFIHOS views — set to false for dp:foundation. +# Set to true only if you have extended the target view with a sysTagsFound property. +populateSysTagsFound: "false" + +# Workflow external ID and schedule +workflow: "wf_{{location}}_ingestion" +workflowSchedule: "0 2 * * *" # daily 02:00 UTC; use "0 0 29 2 *" to effectively disable + +# Workflow trigger authentication — set to the workflow service account credentials +workflowClientId: "${IDP_CLIENT_ID}" +workflowClientSecret: "${IDP_CLIENT_SECRET}" + +# IDP source IDs for workflow groups +workflowGroupSourceId: "" +workflowUserGroupSourceId: "" + +# Which source system modules are deployed at this site +# Set to false for any source system not deployed +enabledSources: + pi: true + opcua: false + sap: true + +# Contextualization tasks to include in the second workflow phase +# Driven by dataModelVariant — only override if needed +enabledContextualization: + isaRelationships: true # equipment_to_asset and operation_to_order (isa_manufacturing_extension) + +# Data model variant — controls which ctx task snippets are included +# Supported: isa_manufacturing_extension | cfihos_oil_and_gas +dataModelVariant: isa_manufacturing_extension diff --git a/modules/common/cdf_ingestion_foundation/module.toml b/modules/common/cdf_ingestion_foundation/module.toml new file mode 100644 index 00000000..d3b76ce7 --- /dev/null +++ b/modules/common/cdf_ingestion_foundation/module.toml @@ -0,0 +1,4 @@ +[module] +title = "CDF Ingestion Foundation" +id = "cdf_ingestion_foundation" +package_id = "dp:foundation" diff --git a/modules/common/cdf_ingestion_foundation/scripts/_pack_config.py b/modules/common/cdf_ingestion_foundation/scripts/_pack_config.py new file mode 100644 index 00000000..ce179a44 --- /dev/null +++ b/modules/common/cdf_ingestion_foundation/scripts/_pack_config.py @@ -0,0 +1,189 @@ +"""Shared helpers for configure_datamodel.py and build_workflow.py.""" + +from __future__ import annotations + +import re +from pathlib import Path + +import yaml + +MODULE_ROOT = Path(__file__).parent.parent +REPO_ROOT = MODULE_ROOT.parent.parent.parent # scripts/ -> module/ -> modules/ -> repo/ + +KNOWN_DATA_MODEL_DIRS = ( + "isa_manufacturing_extension", + "cfihos_oil_and_gas_extension", +) + +# modules/sourcesystem/ → enabledSources key in cdf_ingestion_foundation config +SOURCE_SYSTEM_DIR_TO_ENABLED_KEY: dict[str, str] = { + "cdf_pi_foundation": "pi", + "cdf_sap_foundation": "sap", + "cdf_opcua_foundation": "opcua", + "cdf_db_foundation": "db", + "cdf_files_foundation": "files", +} + +# Sources that have population/contextualization tasks in the ingestion workflow +WORKFLOW_TASK_SOURCE_KEYS = frozenset({"pi", "sap", "opcua"}) + + +def get_org_dir_name(repo_root: Path | None = None) -> str | None: + """Return default_organization_dir from cdf.toml, or None if unset/empty.""" + root = repo_root or REPO_ROOT + toml_path = root / "cdf.toml" + if not toml_path.exists(): + return None + content = toml_path.read_text() + m = re.search(r"""default_organization_dir\s*=\s*["']([^"']*)["']""", content) + if not m: + return None + value = m.group(1).strip() + return value or None + + +def get_pack_root(repo_root: Path | None = None) -> Path: + """Repository root, or / when default_organization_dir is set.""" + root = repo_root or REPO_ROOT + org = get_org_dir_name(root) + if org: + return root / org + return root + + +def get_data_models_dir(repo_root: Path | None = None) -> Path: + """Path to modules/data_models under the pack root (org-prefixed when configured).""" + return get_pack_root(repo_root) / "modules" / "data_models" + + +def get_sourcesystem_dir(repo_root: Path | None = None) -> Path: + """Path to modules/sourcesystem under the pack root (org-prefixed when configured).""" + return get_pack_root(repo_root) / "modules" / "sourcesystem" + + +def detect_enabled_sources(repo_root: Path | None = None) -> dict[str, bool]: + """ + Scan modules/sourcesystem/ and set enabledSources flags from installed module dirs. + Returns all known keys (pi, sap, opcua, db, files); missing dirs are false. + """ + sourcesystem_dir = get_sourcesystem_dir(repo_root) + enabled = {key: False for key in SOURCE_SYSTEM_DIR_TO_ENABLED_KEY.values()} + if not sourcesystem_dir.is_dir(): + return enabled + for module_dir, key in SOURCE_SYSTEM_DIR_TO_ENABLED_KEY.items(): + if (sourcesystem_dir / module_dir).is_dir(): + enabled[key] = True + return enabled + + +def list_installed_source_system_modules(repo_root: Path | None = None) -> list[str]: + """Module directory names present under modules/sourcesystem/.""" + sourcesystem_dir = get_sourcesystem_dir(repo_root) + if not sourcesystem_dir.is_dir(): + return [] + return [ + module_dir + for module_dir in SOURCE_SYSTEM_DIR_TO_ENABLED_KEY + if (sourcesystem_dir / module_dir).is_dir() + ] + + +def get_default_env(repo_root: Path | None = None) -> str: + """Read default_env from cdf.toml (falls back to dev).""" + root = repo_root or REPO_ROOT + toml_path = root / "cdf.toml" + if toml_path.exists(): + content = toml_path.read_text() + m = re.search(r"""default_env\s*=\s*["']([^"']+)["']""", content) + if m: + return m.group(1).strip() + return "dev" + + +def find_env_configs(repo_root: Path | None = None) -> list[Path]: + """ + Discover config..yaml files (CDF Toolkit order): + 1. /config.*.yaml + 2. /config.*.yaml (when pack-root differs) + Backup files (*.bak.*) are excluded. + """ + root = repo_root or REPO_ROOT + pack_root = get_pack_root(root) + search_dirs: list[Path] = [pack_root] + if pack_root != root: + search_dirs.append(root) + + found: list[Path] = [] + seen: set[Path] = set() + for search_dir in search_dirs: + for path in sorted(search_dir.glob("config.*.yaml")): + if ".bak." in path.name: + continue + resolved = path.resolve() + if resolved not in seen: + seen.add(resolved) + found.append(path) + return found + + +def find_env_config(env: str, repo_root: Path | None = None) -> Path | None: + """Return config..yaml from discovered files, or None.""" + target = f"config.{env}.yaml" + for path in find_env_configs(repo_root): + if path.name == target: + return path + return None + + +def detect_data_model_variant(data_models_dir: Path) -> str: + """ + Detect installed data model from subdirectory names under modules/data_models/. + Raises SystemExit with a message when zero or multiple models are found. + """ + if not data_models_dir.is_dir(): + raise SystemExit( + f"ERROR: Data models directory not found: {data_models_dir}\n" + " Expected modules/data_models/ under the pack root." + ) + + present = [ + name + for name in KNOWN_DATA_MODEL_DIRS + if (data_models_dir / name).is_dir() + ] + if not present: + raise SystemExit( + f"ERROR: No supported data model found under {data_models_dir}\n" + f" Expected one of: {', '.join(KNOWN_DATA_MODEL_DIRS)}" + ) + if len(present) > 1: + raise SystemExit( + f"ERROR: Multiple data models found under {data_models_dir}: {present}\n" + " Keep only one model directory per deployment pack." + ) + return present[0] + + +def load_yaml(path: Path) -> dict: + return yaml.safe_load(path.read_text()) or {} + + +def get_ingestion_config(config: dict) -> dict: + """Return variables.modules.common.cdf_ingestion_foundation from a config file.""" + return ( + config.get("variables", {}) + .get("modules", {}) + .get("common", {}) + .get("cdf_ingestion_foundation", {}) + ) + + +def deep_merge(base: dict, overlay: dict) -> dict: + """Recursively merge overlay into base (overlay wins on conflicts).""" + result = dict(base) + for key, value in overlay.items(): + if key in result and isinstance(result[key], dict) and isinstance(value, dict): + result[key] = deep_merge(result[key], value) + else: + result[key] = value + return result diff --git a/modules/common/cdf_ingestion_foundation/scripts/build_workflow.py b/modules/common/cdf_ingestion_foundation/scripts/build_workflow.py new file mode 100644 index 00000000..a3c11465 --- /dev/null +++ b/modules/common/cdf_ingestion_foundation/scripts/build_workflow.py @@ -0,0 +1,204 @@ +""" +Generate workflows/wf_ingestion_v1.WorkflowVersion.yaml from config..yaml +(enabledSources, dataModelVariant) and workflow_template/tasks/. Run after configure_datamodel.py. + +Usage: python build_workflow.py [--env dev] [--check] +""" + +from __future__ import annotations + +import argparse +import sys +from pathlib import Path + +import yaml +from _pack_config import ( + find_env_config, + get_default_env, + get_ingestion_config, + load_yaml, +) + +MODULE_ROOT = Path(__file__).parent.parent +TASKS_DIR = MODULE_ROOT / "workflow_template" / "tasks" +OUTPUT_FILE = MODULE_ROOT / "workflows" / "wf_ingestion_v1.WorkflowVersion.yaml" + + +def resolve_task_filenames(config: dict) -> list[str]: + """ + Return ordered task snippet filenames from ingestion foundation config. + """ + sources = config.get("enabledSources", {}) + ctx = config.get("enabledContextualization", {}) + dm = config.get("dataModelVariant", "isa_manufacturing_extension") + + tasks: list[str] = [] + + if sources.get("pi", False): + tasks.append("task.pi_timeseries.yaml") + + if sources.get("opcua", False): + tasks.append("task.opcua_timeseries.yaml") + + if sources.get("sap", False): + tasks += [ + "task.sap_assets.yaml", + "task.sap_equipment.yaml", + "task.sap_maintenance_orders.yaml", + "task.sap_operations.yaml", + ] + + if dm == "isa_manufacturing_extension": + if ctx.get("isaRelationships", True) and sources.get("sap", False): + tasks += [ + "ctx.isa_manufacturing_extension.equipment_to_asset.yaml", + "ctx.isa_manufacturing_extension.operation_to_order.yaml", + ] + elif dm == "cfihos_oil_and_gas_extension": + # CFIHOS-specific contextualization task snippets can be added here. + pass + else: + print(f"WARNING: Unknown dataModelVariant '{dm}'. No contextualization tasks added.") + print( + "Supported variants: isa_manufacturing_extension | cfihos_oil_and_gas_extension" + ) + + return tasks + + +def load_tasks(filenames: list[str]) -> list[dict]: + tasks = [] + for fn in filenames: + path = TASKS_DIR / fn + if not path.exists(): + print(f"ERROR: Task snippet not found: {path}") + sys.exit(1) + tasks.append(yaml.safe_load(path.read_text())) + return tasks + + +def validate_dependencies(tasks: list[dict]) -> None: + task_ids = {t["externalId"] for t in tasks} + for task in tasks: + for dep in task.get("dependsOn", []): + dep_id = dep.get("externalId") + if dep_id and dep_id not in task_ids: + raise ValueError( + f"\nDependency error in task '{task['externalId']}':\n" + f" dependsOn '{dep_id}' which is not in the enabled task set.\n" + f" Included tasks: {sorted(task_ids)}\n" + f" Check enabledSources / enabledContextualization in config..yaml." + ) + + +def warn_extractor_only_sources(config: dict) -> None: + """db and files foundations use extractors only — no ingestion workflow tasks.""" + sources = config.get("enabledSources", {}) + for key in ("db", "files"): + if sources.get(key): + print( + f"NOTE: enabledSources.{key}=true — module is deployed via its extractor; " + "no transformation task is added to wf_ingestion (author transforms separately)." + ) + + +def build(config: dict, env: str) -> str: + warn_extractor_only_sources(config) + filenames = resolve_task_filenames(config) + + if not filenames: + print( + f"WARNING: No tasks enabled for env '{env}'. " + "Check enabledSources in config..yaml." + ) + + tasks = load_tasks(filenames) + validate_dependencies(tasks) + + workflow_name = config.get("workflow", "wf_{{location}}_ingestion") + + workflow_version = [ + { + "workflowExternalId": workflow_name, + "version": "v1", + "workflowDefinition": { + "description": ( + "GENERATED by scripts/build_workflow.py — do not edit by hand. " + f"Env: {env}. " + f"Enabled sources: {config.get('enabledSources', {})}. " + f"DM variant: {config.get('dataModelVariant', 'isa_manufacturing_extension')}." + ), + "tasks": tasks, + }, + } + ] + + return yaml.dump( + workflow_version, sort_keys=False, allow_unicode=True, default_flow_style=False + ) + + +def load_ingestion_from_env(env: str) -> dict: + config_path = find_env_config(env) + if not config_path: + print(f"ERROR: config.{env}.yaml not found.") + print(" Run configure_datamodel.py first, or pass --env with an existing config.") + sys.exit(1) + + config = load_yaml(config_path) + ingestion = get_ingestion_config(config) + if not ingestion: + print( + f"ERROR: variables.modules.common.cdf_ingestion_foundation " + f"missing in {config_path.name}.\n" + " Run configure_datamodel.py first." + ) + sys.exit(1) + return ingestion + + +def main() -> None: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "--env", + default=None, + help="Environment name (config..yaml). Defaults to cdf.toml default_env.", + ) + parser.add_argument( + "--check", + action="store_true", + help="CI mode: exit 1 if generated output differs from committed WorkflowVersion.yaml", + ) + args = parser.parse_args() + + env = args.env or get_default_env() + config = load_ingestion_from_env(env) + generated = build(config, env) + + if args.check: + if not OUTPUT_FILE.exists(): + print( + f"ERROR: {OUTPUT_FILE} does not exist. Run without --check to generate it first." + ) + sys.exit(1) + committed = OUTPUT_FILE.read_text() + if generated != committed: + print( + f"ERROR: {OUTPUT_FILE.name} is out of sync with config.{env}.yaml.\n" + " Run configure_datamodel.py, then python scripts/build_workflow.py" + ) + sys.exit(1) + print(f"OK: {OUTPUT_FILE.name} is up to date (env={env}).") + else: + OUTPUT_FILE.parent.mkdir(parents=True, exist_ok=True) + OUTPUT_FILE.write_text(generated) + print(f"Written: {OUTPUT_FILE} (env={env})") + task_ids = [ + t["externalId"] + for t in yaml.safe_load(generated)[0]["workflowDefinition"]["tasks"] + ] + print(f"Tasks included: {task_ids}") + + +if __name__ == "__main__": + main() diff --git a/modules/common/cdf_ingestion_foundation/scripts/configure_datamodel.py b/modules/common/cdf_ingestion_foundation/scripts/configure_datamodel.py new file mode 100644 index 00000000..25d3cef9 --- /dev/null +++ b/modules/common/cdf_ingestion_foundation/scripts/configure_datamodel.py @@ -0,0 +1,344 @@ +""" +Detect the installed data model under modules/data_models/ and source systems under +modules/sourcesystem/, then sync config..yaml. + +Scans /modules/data_models/ for isa_manufacturing_extension or +cfihos_oil_and_gas_extension, and /modules/sourcesystem/ for foundation +modules (pi, sap, opcua, db, files). Updates enabledSources, contextualization, +sourcesystem, ingestion foundation, and data_models in every config..yaml. + +Run this before build_workflow.py. + +Usage: + python configure_datamodel.py [-y] [--check] +""" + +from __future__ import annotations + +import argparse +import shutil +import sys +from datetime import datetime +from pathlib import Path + +import yaml +from _pack_config import ( + REPO_ROOT, + SOURCE_SYSTEM_DIR_TO_ENABLED_KEY, + deep_merge, + detect_data_model_variant, + detect_enabled_sources, + find_env_configs, + get_data_models_dir, + get_pack_root, + get_sourcesystem_dir, + list_installed_source_system_modules, + load_yaml, +) + +# Per-variant overrides for contextualization modules (None → use instance_space). +CONTEXTUALIZATION_VARIABLES: dict[str, dict[str, dict]] = { + "isa_manufacturing_extension": { + "cdf_entity_matching": { + "schemaSpace": "sp_isa_manufacturing", + "assetInstanceSpace": None, + "timeseriesInstanceSpace": None, + "AssetViewExternalId": "ISAAsset", + "TimeSeriesViewExternalId": "ISATimeSeries", + "targetViewExternalId": "ISAAsset", + "entityViewExternalId": "ISATimeSeries", + "targetViewSearchProperty": "name", + "targetViewFilterValues": [], + "entityViewSearchProperty": "name", + "entityViewFilterValues": [], + "reservedWordPrefix": "", + }, + "cdf_file_annotation": { + "fileSchemaSpace": "sp_isa_manufacturing", + "fileInstanceSpace": None, + "fileExternalId": "ISAFile", + "targetEntitySchemaSpace": "sp_isa_manufacturing", + "targetEntityInstanceSpace": None, + "targetEntityExternalId": "ISAAsset", + }, + }, + "cfihos_oil_and_gas_extension": { + "cdf_entity_matching": { + "schemaSpace": "dm_dom_oil_and_gas", + "assetInstanceSpace": None, + "timeseriesInstanceSpace": None, + "AssetViewExternalId": "FunctionalLocation", + "TimeSeriesViewExternalId": "TimeSeriesData", + "targetViewExternalId": "FunctionalLocation", + "entityViewExternalId": "TimeSeriesData", + "targetViewSearchProperty": "name", + "targetViewFilterValues": [], + "entityViewSearchProperty": "name", + "entityViewFilterValues": [], + "reservedWordPrefix": "", + }, + "cdf_file_annotation": { + "fileSchemaSpace": "dm_dom_oil_and_gas", + "fileInstanceSpace": None, + "fileExternalId": "Files", + "targetEntitySchemaSpace": "dm_dom_oil_and_gas", + "targetEntityInstanceSpace": None, + "targetEntityExternalId": "FunctionalLocation", + }, + }, +} + +INGESTION_FOUNDATION_VARIABLES: dict[str, dict[str, str]] = { + "isa_manufacturing_extension": { + "dataModelVariant": "isa_manufacturing_extension", + "isaSchemaSpace": "sp_isa_manufacturing", + "instanceSpace": "sp_isa_instance_space", + }, + "cfihos_oil_and_gas_extension": { + "dataModelVariant": "cfihos_oil_and_gas_extension", + "isaSchemaSpace": "dm_dom_oil_and_gas", + "instanceSpace": "sp_cfihos_instance_space", + }, +} + +DATA_MODELS_MODULE_VARIABLES: dict[str, dict[str, str]] = { + "isa_manufacturing_extension": { + "isaSchemaSpace": "sp_isa_manufacturing", + "isaInstanceSpace": "sp_isa_instance_space", + }, + "cfihos_oil_and_gas_extension": { + "isaSchemaSpace": "dm_dom_oil_and_gas", + "isaInstanceSpace": "sp_cfihos_instance_space", + }, +} + +def resolve_contextualization_variables( + variant: str, instance_space: str +) -> dict[str, dict]: + templates = CONTEXTUALIZATION_VARIABLES[variant] + return { + module: { + key: (instance_space if value is None else value) + for key, value in overrides.items() + } + for module, overrides in templates.items() + } + + +def resolve_sourcesystem_variables( + instance_space: str, installed_modules: list[str] +) -> dict[str, dict]: + """instanceSpace for each installed source system module only.""" + return { + module: {"instanceSpace": instance_space} + for module in installed_modules + } + + +def build_overlay(variant: str, enabled_sources: dict[str, bool]) -> dict: + ingestion = INGESTION_FOUNDATION_VARIABLES[variant] + instance_space = ingestion["instanceSpace"] + installed_modules = list_installed_source_system_modules() + + return { + "variables": { + "modules": { + "common": { + "cdf_ingestion_foundation": { + **ingestion, + "enabledSources": enabled_sources, + }, + }, + "contextualization": resolve_contextualization_variables( + variant, instance_space + ), + "sourcesystem": resolve_sourcesystem_variables( + instance_space, installed_modules + ), + "data_models": { + variant: DATA_MODELS_MODULE_VARIABLES[variant], + }, + }, + }, + } + + +def collect_expected(variant: str, enabled_sources: dict[str, bool]) -> dict: + """Flat map of dotted paths → expected values for --check.""" + overlay = build_overlay(variant, enabled_sources) + modules = overlay["variables"]["modules"] + expected: dict[str, object] = {} + + for key, value in modules["common"]["cdf_ingestion_foundation"].items(): + expected[f"common.cdf_ingestion_foundation.{key}"] = value + + for module, overrides in modules["contextualization"].items(): + for key, value in overrides.items(): + expected[f"contextualization.{module}.{key}"] = value + + for module, overrides in modules["sourcesystem"].items(): + for key, value in overrides.items(): + expected[f"sourcesystem.{module}.{key}"] = value + + for key, value in modules["data_models"][variant].items(): + expected[f"data_models.{variant}.{key}"] = value + + return expected + + +def get_actual_value(config: dict, dotted: str) -> object: + parts = dotted.split(".") + node = config.get("variables", {}).get("modules", {}) + for part in parts: + if not isinstance(node, dict) or part not in node: + return None + node = node[part] + return node + + +def check_config_file( + path: Path, variant: str, enabled_sources: dict[str, bool] +) -> list[str]: + config = load_yaml(path) + errors: list[str] = [] + for dotted, expected_value in collect_expected(variant, enabled_sources).items(): + actual = get_actual_value(config, dotted) + if actual != expected_value: + errors.append(f" {dotted}: got {actual!r}, expected {expected_value!r}") + return errors + + +def print_summary( + variant: str, + data_models_dir: Path, + sourcesystem_dir: Path, + pack_root: Path, + enabled_sources: dict[str, bool], +) -> None: + overlay = build_overlay(variant, enabled_sources) + ingestion = overlay["variables"]["modules"]["common"]["cdf_ingestion_foundation"] + print(f"\n Detected data model : {variant}") + print(f" Data models path : {data_models_dir.relative_to(REPO_ROOT)}") + print(f" Source systems path : {sourcesystem_dir.relative_to(REPO_ROOT)}") + print(f" Pack root : {pack_root.relative_to(REPO_ROOT)}") + print(f" dataModelVariant : {ingestion['dataModelVariant']}") + print(f" isaSchemaSpace : {ingestion['isaSchemaSpace']}") + print(f" instanceSpace : {ingestion['instanceSpace']}") + print(f" enabledSources : {enabled_sources}") + + print("\n Contextualization (variables.modules.contextualization):") + for module, overrides in overlay["variables"]["modules"]["contextualization"].items(): + print(f"\n [{module}]") + for key, value in overrides.items(): + print(f" {key}: {value!r}") + + print("\n Source systems (variables.modules.sourcesystem):") + for module, overrides in overlay["variables"]["modules"]["sourcesystem"].items(): + print(f" {module}.instanceSpace: {overrides['instanceSpace']!r}") + print() + + +def main() -> None: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "--check", + action="store_true", + help="CI mode: exit 1 if any discovered config file is out of sync", + ) + parser.add_argument( + "--yes", + "-y", + action="store_true", + help="Skip confirmation prompt and apply immediately", + ) + args = parser.parse_args() + + pack_root = get_pack_root() + data_models_dir = get_data_models_dir() + sourcesystem_dir = get_sourcesystem_dir() + variant = detect_data_model_variant(data_models_dir) + enabled_sources = detect_enabled_sources() + + if not any(enabled_sources.values()): + print( + f"WARNING: No source system modules found under {sourcesystem_dir}\n" + f" Expected subdirectories: {', '.join(SOURCE_SYSTEM_DIR_TO_ENABLED_KEY)}" + ) + + if variant not in CONTEXTUALIZATION_VARIABLES: + print(f"ERROR: No variable template for variant '{variant}'.") + sys.exit(1) + + config_files = find_env_configs() + if not config_files: + print( + "WARNING: No config..yaml files found.\n" + f" Searched pack root: {pack_root}" + ) + sys.exit(0) + + if args.check: + all_errors: dict[str, list[str]] = {} + for path in config_files: + errs = check_config_file(path, variant, enabled_sources) + if errs: + all_errors[path.name] = errs + + if all_errors: + print( + f"ERROR: Config file(s) out of sync with detected model '{variant}':\n" + ) + for filename, errs in all_errors.items(): + print(f" {filename}") + for e in errs: + print(e) + print( + "\n Run: python scripts/configure_datamodel.py -y" + ) + sys.exit(1) + + print( + f"OK: All {len(config_files)} config file(s) match '{variant}'." + ) + return + + print_summary(variant, data_models_dir, sourcesystem_dir, pack_root, enabled_sources) + print(f" Config files to update ({len(config_files)}):") + for p in config_files: + print(f" {p.relative_to(REPO_ROOT)}") + print() + + if not args.yes: + try: + answer = ( + input(" Apply these overrides to all config files above? [y/N] ") + .strip() + .lower() + ) + except EOFError: + answer = "n" + if answer not in ("y", "yes"): + print(" Aborted — no changes written.") + sys.exit(0) + + overlay = build_overlay(variant, enabled_sources) + for path in config_files: + existing = load_yaml(path) + updated = deep_merge(existing, overlay) + new_yaml = yaml.dump( + updated, sort_keys=False, allow_unicode=True, default_flow_style=False + ) + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + backup_path = path.with_name(f"{path.stem}.yaml.bak.{timestamp}") + shutil.copy2(path, backup_path) + + path.write_text(new_yaml) + print(f" Updated : {path.relative_to(REPO_ROOT)} (backup: {backup_path.name})") + + print(f"\n Done — {len(config_files)} file(s) updated for variant={variant}") + print(" Next: python scripts/build_workflow.py") + + +if __name__ == "__main__": + main() diff --git a/modules/common/cdf_ingestion_foundation/transformations/contextualization/sap_equipment_to_asset.Transformation.sql b/modules/common/cdf_ingestion_foundation/transformations/contextualization/sap_equipment_to_asset.Transformation.sql new file mode 100644 index 00000000..75bcd766 --- /dev/null +++ b/modules/common/cdf_ingestion_foundation/transformations/contextualization/sap_equipment_to_asset.Transformation.sql @@ -0,0 +1,2 @@ +-- SAP Equipment → ISAAsset relation +-- TODO: Add SELECT joining equipment to functional_location on Floc field, setting Equipment.asset edge diff --git a/modules/common/cdf_ingestion_foundation/transformations/contextualization/sap_equipment_to_asset.Transformation.yaml b/modules/common/cdf_ingestion_foundation/transformations/contextualization/sap_equipment_to_asset.Transformation.yaml new file mode 100644 index 00000000..3e420444 --- /dev/null +++ b/modules/common/cdf_ingestion_foundation/transformations/contextualization/sap_equipment_to_asset.Transformation.yaml @@ -0,0 +1,13 @@ +externalId: "tr_sap_equipment_{{location}}_to_asset" +dataSetExternalId: "{{dataset}}" +name: "SAP Equipment → ISAAsset relation ({{location}})" +destination: + type: nodes + view: + space: "{{isaSchemaSpace}}" + externalId: Equipment + version: "{{viewVersion}}" + instanceSpace: "{{instanceSpace}}" +conflictMode: upsert +isPublic: true +ignoreNullFields: true diff --git a/modules/common/cdf_ingestion_foundation/transformations/contextualization/sap_operation_to_order.Transformation.sql b/modules/common/cdf_ingestion_foundation/transformations/contextualization/sap_operation_to_order.Transformation.sql new file mode 100644 index 00000000..f4339a6d --- /dev/null +++ b/modules/common/cdf_ingestion_foundation/transformations/contextualization/sap_operation_to_order.Transformation.sql @@ -0,0 +1,2 @@ +-- SAP Notifications → WorkOrder relation +-- TODO: Add SELECT joining workitem to workorder on OrderId field, setting Operation.workOrder edge diff --git a/modules/common/cdf_ingestion_foundation/transformations/contextualization/sap_operation_to_order.Transformation.yaml b/modules/common/cdf_ingestion_foundation/transformations/contextualization/sap_operation_to_order.Transformation.yaml new file mode 100644 index 00000000..c2eefd16 --- /dev/null +++ b/modules/common/cdf_ingestion_foundation/transformations/contextualization/sap_operation_to_order.Transformation.yaml @@ -0,0 +1,13 @@ +externalId: "tr_sap_operation_{{location}}_to_order" +dataSetExternalId: "{{dataset}}" +name: "SAP Notification → WorkOrder relation ({{location}})" +destination: + type: nodes + view: + space: "{{isaSchemaSpace}}" + externalId: Operation + version: "{{viewVersion}}" + instanceSpace: "{{instanceSpace}}" +conflictMode: upsert +isPublic: true +ignoreNullFields: true diff --git a/modules/common/cdf_ingestion_foundation/transformations/population/opcua_timeseries.Transformation.sql b/modules/common/cdf_ingestion_foundation/transformations/population/opcua_timeseries.Transformation.sql new file mode 100644 index 00000000..80b4b8cd --- /dev/null +++ b/modules/common/cdf_ingestion_foundation/transformations/population/opcua_timeseries.Transformation.sql @@ -0,0 +1,2 @@ +-- OPC-UA TimeSeries → ISATimeSeries +-- TODO: Add SELECT from `db_{{location}}_opcua.timeseries` RAW table, mapping to ISATimeSeries view fields diff --git a/modules/common/cdf_ingestion_foundation/transformations/population/opcua_timeseries.Transformation.yaml b/modules/common/cdf_ingestion_foundation/transformations/population/opcua_timeseries.Transformation.yaml new file mode 100644 index 00000000..105f0cd7 --- /dev/null +++ b/modules/common/cdf_ingestion_foundation/transformations/population/opcua_timeseries.Transformation.yaml @@ -0,0 +1,13 @@ +externalId: "tr_opcua_timeseries_{{location}}_to_isa" +dataSetExternalId: "{{dataset}}" +name: "OPC-UA TimeSeries → ISATimeSeries ({{location}})" +destination: + type: nodes + view: + space: "{{isaSchemaSpace}}" + externalId: ISATimeSeries + version: "{{viewVersion}}" + instanceSpace: "{{instanceSpace}}" +conflictMode: upsert +isPublic: true +ignoreNullFields: true diff --git a/modules/common/cdf_ingestion_foundation/transformations/population/pi_timeseries.Transformation.sql b/modules/common/cdf_ingestion_foundation/transformations/population/pi_timeseries.Transformation.sql new file mode 100644 index 00000000..9470ebb9 --- /dev/null +++ b/modules/common/cdf_ingestion_foundation/transformations/population/pi_timeseries.Transformation.sql @@ -0,0 +1,2 @@ +-- PI TimeSeries → ISATimeSeries +-- TODO: Add SELECT from timeseries() filtered by {{piDataset}}, mapping to ISATimeSeries view fields diff --git a/modules/common/cdf_ingestion_foundation/transformations/population/pi_timeseries.Transformation.yaml b/modules/common/cdf_ingestion_foundation/transformations/population/pi_timeseries.Transformation.yaml new file mode 100644 index 00000000..0ce15163 --- /dev/null +++ b/modules/common/cdf_ingestion_foundation/transformations/population/pi_timeseries.Transformation.yaml @@ -0,0 +1,13 @@ +externalId: "tr_pi_timeseries_{{location}}_to_isa" +dataSetExternalId: "{{dataset}}" +name: "PI TimeSeries → ISATimeSeries ({{location}})" +destination: + type: nodes + view: + space: "{{isaSchemaSpace}}" + externalId: ISATimeSeries + version: "{{viewVersion}}" + instanceSpace: "{{instanceSpace}}" +conflictMode: upsert +isPublic: true +ignoreNullFields: true diff --git a/modules/common/cdf_ingestion_foundation/transformations/population/sap_assets.Transformation.sql b/modules/common/cdf_ingestion_foundation/transformations/population/sap_assets.Transformation.sql new file mode 100644 index 00000000..3706b196 --- /dev/null +++ b/modules/common/cdf_ingestion_foundation/transformations/population/sap_assets.Transformation.sql @@ -0,0 +1,2 @@ +-- SAP Functional Locations → ISAAsset +-- TODO: Add SELECT from `db_{{location}}_sap.functional_location` RAW table, mapping to ISAAsset view fields diff --git a/modules/common/cdf_ingestion_foundation/transformations/population/sap_assets.Transformation.yaml b/modules/common/cdf_ingestion_foundation/transformations/population/sap_assets.Transformation.yaml new file mode 100644 index 00000000..956dd8c4 --- /dev/null +++ b/modules/common/cdf_ingestion_foundation/transformations/population/sap_assets.Transformation.yaml @@ -0,0 +1,13 @@ +externalId: "tr_sap_assets_{{location}}_to_isa" +dataSetExternalId: "{{dataset}}" +name: "SAP Functional Locations → ISAAsset ({{location}})" +destination: + type: nodes + view: + space: "{{isaSchemaSpace}}" + externalId: ISAAsset + version: "{{viewVersion}}" + instanceSpace: "{{instanceSpace}}" +conflictMode: upsert +isPublic: true +ignoreNullFields: true diff --git a/modules/common/cdf_ingestion_foundation/transformations/population/sap_equipment.Transformation.sql b/modules/common/cdf_ingestion_foundation/transformations/population/sap_equipment.Transformation.sql new file mode 100644 index 00000000..465f56dc --- /dev/null +++ b/modules/common/cdf_ingestion_foundation/transformations/population/sap_equipment.Transformation.sql @@ -0,0 +1,2 @@ +-- SAP Equipment → Equipment +-- TODO: Add SELECT from `db_{{location}}_sap.equipment` RAW table, mapping to Equipment view fields diff --git a/modules/common/cdf_ingestion_foundation/transformations/population/sap_equipment.Transformation.yaml b/modules/common/cdf_ingestion_foundation/transformations/population/sap_equipment.Transformation.yaml new file mode 100644 index 00000000..0fc1f9b4 --- /dev/null +++ b/modules/common/cdf_ingestion_foundation/transformations/population/sap_equipment.Transformation.yaml @@ -0,0 +1,13 @@ +externalId: "tr_sap_equipment_{{location}}_to_isa" +dataSetExternalId: "{{dataset}}" +name: "SAP Equipment → Equipment ({{location}})" +destination: + type: nodes + view: + space: "{{isaSchemaSpace}}" + externalId: Equipment + version: "{{viewVersion}}" + instanceSpace: "{{instanceSpace}}" +conflictMode: upsert +isPublic: true +ignoreNullFields: true diff --git a/modules/common/cdf_ingestion_foundation/transformations/population/sap_maintenance_orders.Transformation.sql b/modules/common/cdf_ingestion_foundation/transformations/population/sap_maintenance_orders.Transformation.sql new file mode 100644 index 00000000..52fa791b --- /dev/null +++ b/modules/common/cdf_ingestion_foundation/transformations/population/sap_maintenance_orders.Transformation.sql @@ -0,0 +1,2 @@ +-- SAP Work Orders → WorkOrder +-- TODO: Add SELECT from `db_{{location}}_sap.workorder` RAW table, mapping to WorkOrder view fields diff --git a/modules/common/cdf_ingestion_foundation/transformations/population/sap_maintenance_orders.Transformation.yaml b/modules/common/cdf_ingestion_foundation/transformations/population/sap_maintenance_orders.Transformation.yaml new file mode 100644 index 00000000..ab6443c6 --- /dev/null +++ b/modules/common/cdf_ingestion_foundation/transformations/population/sap_maintenance_orders.Transformation.yaml @@ -0,0 +1,13 @@ +externalId: "tr_sap_maintenance_orders_{{location}}_to_isa" +dataSetExternalId: "{{dataset}}" +name: "SAP Work Orders → WorkOrder ({{location}})" +destination: + type: nodes + view: + space: "{{isaSchemaSpace}}" + externalId: WorkOrder + version: "{{viewVersion}}" + instanceSpace: "{{instanceSpace}}" +conflictMode: upsert +isPublic: true +ignoreNullFields: true diff --git a/modules/common/cdf_ingestion_foundation/transformations/population/sap_operations.Transformation.sql b/modules/common/cdf_ingestion_foundation/transformations/population/sap_operations.Transformation.sql new file mode 100644 index 00000000..686138c8 --- /dev/null +++ b/modules/common/cdf_ingestion_foundation/transformations/population/sap_operations.Transformation.sql @@ -0,0 +1,2 @@ +-- SAP Notifications → Operation +-- TODO: Add SELECT from `db_{{location}}_sap.workitem` RAW table, mapping to Operation view fields diff --git a/modules/common/cdf_ingestion_foundation/transformations/population/sap_operations.Transformation.yaml b/modules/common/cdf_ingestion_foundation/transformations/population/sap_operations.Transformation.yaml new file mode 100644 index 00000000..d7bdd94f --- /dev/null +++ b/modules/common/cdf_ingestion_foundation/transformations/population/sap_operations.Transformation.yaml @@ -0,0 +1,13 @@ +externalId: "tr_sap_operations_{{location}}_to_isa" +dataSetExternalId: "{{dataset}}" +name: "SAP Notifications → Operation ({{location}})" +destination: + type: nodes + view: + space: "{{isaSchemaSpace}}" + externalId: Operation + version: "{{viewVersion}}" + instanceSpace: "{{instanceSpace}}" +conflictMode: upsert +isPublic: true +ignoreNullFields: true diff --git a/modules/common/cdf_ingestion_foundation/workflow_template/tasks/ctx.isa_manufacturing_extension.equipment_to_asset.yaml b/modules/common/cdf_ingestion_foundation/workflow_template/tasks/ctx.isa_manufacturing_extension.equipment_to_asset.yaml new file mode 100644 index 00000000..77ea4223 --- /dev/null +++ b/modules/common/cdf_ingestion_foundation/workflow_template/tasks/ctx.isa_manufacturing_extension.equipment_to_asset.yaml @@ -0,0 +1,14 @@ +externalId: ctx_isa_equipment_to_asset +type: transformation +name: "CTX: Equipment → Asset relation" +description: "Set Equipment.asset property linking each equipment to its functional location ISAAsset" +dependsOn: + - externalId: task_sap_assets + - externalId: task_sap_equipment +parameters: + transformation: + externalId: "tr_sap_equipment_{{location}}_to_asset" + concurrencyPolicy: fail +retries: 1 +timeout: 1800 +onFailure: abortWorkflow diff --git a/modules/common/cdf_ingestion_foundation/workflow_template/tasks/ctx.isa_manufacturing_extension.operation_to_order.yaml b/modules/common/cdf_ingestion_foundation/workflow_template/tasks/ctx.isa_manufacturing_extension.operation_to_order.yaml new file mode 100644 index 00000000..0b6202b4 --- /dev/null +++ b/modules/common/cdf_ingestion_foundation/workflow_template/tasks/ctx.isa_manufacturing_extension.operation_to_order.yaml @@ -0,0 +1,14 @@ +externalId: ctx_isa_operation_to_order +type: transformation +name: "CTX: Operation → WorkOrder relation" +description: "Set Operation.workOrder property linking each operation to its parent work order" +dependsOn: + - externalId: task_sap_maintenance_orders + - externalId: task_sap_operations +parameters: + transformation: + externalId: "tr_sap_operation_{{location}}_to_order" + concurrencyPolicy: fail +retries: 1 +timeout: 1800 +onFailure: abortWorkflow diff --git a/modules/common/cdf_ingestion_foundation/workflow_template/tasks/task.opcua_timeseries.yaml b/modules/common/cdf_ingestion_foundation/workflow_template/tasks/task.opcua_timeseries.yaml new file mode 100644 index 00000000..9c6b6c00 --- /dev/null +++ b/modules/common/cdf_ingestion_foundation/workflow_template/tasks/task.opcua_timeseries.yaml @@ -0,0 +1,11 @@ +externalId: task_opcua_timeseries +type: transformation +name: "OPC-UA: RAW → ISATimeSeries" +description: "Transform OPC-UA node metadata from RAW into ISATimeSeries DM instances" +parameters: + transformation: + externalId: "tr_opcua_timeseries_{{location}}_to_isa" + concurrencyPolicy: fail +retries: 1 +timeout: 1800 +onFailure: abortWorkflow diff --git a/modules/common/cdf_ingestion_foundation/workflow_template/tasks/task.pi_timeseries.yaml b/modules/common/cdf_ingestion_foundation/workflow_template/tasks/task.pi_timeseries.yaml new file mode 100644 index 00000000..01c7ff0e --- /dev/null +++ b/modules/common/cdf_ingestion_foundation/workflow_template/tasks/task.pi_timeseries.yaml @@ -0,0 +1,11 @@ +externalId: task_pi_timeseries +type: transformation +name: "PI: TimeSeries → ISATimeSeries" +description: "Promote PI ExtractorTimeSeries instances to ISATimeSeries DM nodes (direct-write, no RAW staging)" +parameters: + transformation: + externalId: "tr_pi_timeseries_{{location}}_to_isa" + concurrencyPolicy: fail +retries: 1 +timeout: 1800 +onFailure: abortWorkflow diff --git a/modules/common/cdf_ingestion_foundation/workflow_template/tasks/task.sap_assets.yaml b/modules/common/cdf_ingestion_foundation/workflow_template/tasks/task.sap_assets.yaml new file mode 100644 index 00000000..6bb6625c --- /dev/null +++ b/modules/common/cdf_ingestion_foundation/workflow_template/tasks/task.sap_assets.yaml @@ -0,0 +1,11 @@ +externalId: task_sap_assets +type: transformation +name: "SAP: Functional Locations → ISAAsset" +description: "Transform SAP functional locations from RAW into ISAAsset DM instances" +parameters: + transformation: + externalId: "tr_sap_assets_{{location}}_to_isa" + concurrencyPolicy: fail +retries: 1 +timeout: 1800 +onFailure: abortWorkflow diff --git a/modules/common/cdf_ingestion_foundation/workflow_template/tasks/task.sap_equipment.yaml b/modules/common/cdf_ingestion_foundation/workflow_template/tasks/task.sap_equipment.yaml new file mode 100644 index 00000000..f8decc8e --- /dev/null +++ b/modules/common/cdf_ingestion_foundation/workflow_template/tasks/task.sap_equipment.yaml @@ -0,0 +1,13 @@ +externalId: task_sap_equipment +type: transformation +name: "SAP: Equipment → Equipment" +description: "Transform SAP equipment master records from RAW into Equipment DM instances" +dependsOn: + - externalId: task_sap_assets +parameters: + transformation: + externalId: "tr_sap_equipment_{{location}}_to_isa" + concurrencyPolicy: fail +retries: 1 +timeout: 1800 +onFailure: abortWorkflow diff --git a/modules/common/cdf_ingestion_foundation/workflow_template/tasks/task.sap_maintenance_orders.yaml b/modules/common/cdf_ingestion_foundation/workflow_template/tasks/task.sap_maintenance_orders.yaml new file mode 100644 index 00000000..168768aa --- /dev/null +++ b/modules/common/cdf_ingestion_foundation/workflow_template/tasks/task.sap_maintenance_orders.yaml @@ -0,0 +1,13 @@ +externalId: task_sap_maintenance_orders +type: transformation +name: "SAP: Work Orders → WorkOrder" +description: "Transform SAP work orders from RAW into WorkOrder DM instances" +dependsOn: + - externalId: task_sap_assets +parameters: + transformation: + externalId: "tr_sap_maintenance_orders_{{location}}_to_isa" + concurrencyPolicy: fail +retries: 1 +timeout: 1800 +onFailure: abortWorkflow diff --git a/modules/common/cdf_ingestion_foundation/workflow_template/tasks/task.sap_operations.yaml b/modules/common/cdf_ingestion_foundation/workflow_template/tasks/task.sap_operations.yaml new file mode 100644 index 00000000..ac5812eb --- /dev/null +++ b/modules/common/cdf_ingestion_foundation/workflow_template/tasks/task.sap_operations.yaml @@ -0,0 +1,13 @@ +externalId: task_sap_operations +type: transformation +name: "SAP: Work Tasks → Operation" +description: "Transform SAP work tasks from RAW into Operation DM instances" +dependsOn: + - externalId: task_sap_maintenance_orders +parameters: + transformation: + externalId: "tr_sap_operations_{{location}}_to_isa" + concurrencyPolicy: fail +retries: 1 +timeout: 1800 +onFailure: abortWorkflow diff --git a/modules/common/cdf_ingestion_foundation/workflows/wf_ingestion.Workflow.yaml b/modules/common/cdf_ingestion_foundation/workflows/wf_ingestion.Workflow.yaml new file mode 100644 index 00000000..1a3f190d --- /dev/null +++ b/modules/common/cdf_ingestion_foundation/workflows/wf_ingestion.Workflow.yaml @@ -0,0 +1,4 @@ +externalId: "{{workflow}}" +name: "{{workflow}}" +dataSetExternalId: "{{dataset}}" +description: "Two-phase ingestion workflow for site {{location}}: population → contextualization" diff --git a/modules/common/cdf_ingestion_foundation/workflows/wf_ingestion_trigger.WorkflowTrigger.yaml b/modules/common/cdf_ingestion_foundation/workflows/wf_ingestion_trigger.WorkflowTrigger.yaml new file mode 100644 index 00000000..4bca52aa --- /dev/null +++ b/modules/common/cdf_ingestion_foundation/workflows/wf_ingestion_trigger.WorkflowTrigger.yaml @@ -0,0 +1,9 @@ +- externalId: "{{workflow}}_trigger" + triggerRule: + triggerType: schedule + cronExpression: "{{workflowSchedule}}" + workflowExternalId: "{{workflow}}" + workflowVersion: v1 + authentication: + clientId: "{{workflowClientId}}" + clientSecret: "{{workflowClientSecret}}" diff --git a/modules/common/cdf_ingestion_foundation/workflows/wf_ingestion_v1.WorkflowVersion.yaml b/modules/common/cdf_ingestion_foundation/workflows/wf_ingestion_v1.WorkflowVersion.yaml new file mode 100644 index 00000000..22daed51 --- /dev/null +++ b/modules/common/cdf_ingestion_foundation/workflows/wf_ingestion_v1.WorkflowVersion.yaml @@ -0,0 +1,100 @@ +- workflowExternalId: wf_{{location}}_ingestion + version: v1 + workflowDefinition: + description: 'GENERATED by scripts/build_workflow.py — do not edit by hand. Env: + dev. Enabled sources: {''pi'': True, ''opcua'': True, ''sap'': True, ''db'': + True, ''files'': True}. DM variant: isa_manufacturing_extension.' + tasks: + - externalId: task_pi_timeseries + type: transformation + name: 'PI: TimeSeries → ISATimeSeries' + description: Promote PI ExtractorTimeSeries instances to ISATimeSeries DM nodes + (direct-write, no RAW staging) + parameters: + transformation: + externalId: tr_pi_timeseries_{{location}}_to_isa + concurrencyPolicy: fail + retries: 1 + timeout: 1800 + onFailure: abortWorkflow + - externalId: task_sap_assets + type: transformation + name: 'SAP: Functional Locations → ISAAsset' + description: Transform SAP functional locations from RAW into ISAAsset DM instances + parameters: + transformation: + externalId: tr_sap_assets_{{location}}_to_isa + concurrencyPolicy: fail + retries: 1 + timeout: 1800 + onFailure: abortWorkflow + - externalId: task_sap_equipment + type: transformation + name: 'SAP: Equipment → Equipment' + description: Transform SAP equipment master records from RAW into Equipment + DM instances + dependsOn: + - externalId: task_sap_assets + parameters: + transformation: + externalId: tr_sap_equipment_{{location}}_to_isa + concurrencyPolicy: fail + retries: 1 + timeout: 1800 + onFailure: abortWorkflow + - externalId: task_sap_maintenance_orders + type: transformation + name: 'SAP: Work Orders → WorkOrder' + description: Transform SAP work orders from RAW into WorkOrder DM instances + dependsOn: + - externalId: task_sap_assets + parameters: + transformation: + externalId: tr_sap_maintenance_orders_{{location}}_to_isa + concurrencyPolicy: fail + retries: 1 + timeout: 1800 + onFailure: abortWorkflow + - externalId: task_sap_operations + type: transformation + name: 'SAP: Work Tasks → Operation' + description: Transform SAP work tasks from RAW into Operation DM instances + dependsOn: + - externalId: task_sap_maintenance_orders + parameters: + transformation: + externalId: tr_sap_operations_{{location}}_to_isa + concurrencyPolicy: fail + retries: 1 + timeout: 1800 + onFailure: abortWorkflow + - externalId: ctx_isa_equipment_to_asset + type: transformation + name: 'CTX: Equipment → Asset relation' + description: Set Equipment.asset property linking each equipment to its functional + location ISAAsset + dependsOn: + - externalId: task_sap_assets + - externalId: task_sap_equipment + parameters: + transformation: + externalId: tr_sap_equipment_{{location}}_to_asset + concurrencyPolicy: fail + retries: 1 + timeout: 1800 + onFailure: abortWorkflow + - externalId: ctx_isa_operation_to_order + type: transformation + name: 'CTX: Operation → WorkOrder relation' + description: Set Operation.workOrder property linking each operation to its + parent work order + dependsOn: + - externalId: task_sap_maintenance_orders + - externalId: task_sap_operations + parameters: + transformation: + externalId: tr_sap_operation_{{location}}_to_order + concurrencyPolicy: fail + retries: 1 + timeout: 1800 + onFailure: abortWorkflow