diff --git a/pipeline/terraform/main.tf b/pipeline/terraform/main.tf index 862636af2..8f05f4753 100644 --- a/pipeline/terraform/main.tf +++ b/pipeline/terraform/main.tf @@ -215,37 +215,6 @@ resource "google_cloud_run_v2_service" "ingestion_helper" { depends_on = [google_project_service.services] } -resource "google_cloud_run_v2_service" "import_helper" { - name = "import-helper-service" - location = var.region - project = var.project_id - - template { - service_account = google_service_account.automation_sa.email - containers { - image = "${var.artifact_registry_url}/datacommons-import-helper:latest" - env { - name = "PROJECT_ID" - value = var.project_id - } - env { - name = "LOCATION" - value = var.region - } - env { - name = "GCS_BUCKET_ID" - value = google_storage_bucket.import_bucket.name - } - env { - name = "INGESTION_HELPER_URL" - value = google_cloud_run_v2_service.ingestion_helper.uri - } - } - } - - depends_on = [google_project_service.services] -} - # --- Cloud Workflows --- resource "google_workflows_workflow" "import_automation_workflow" { @@ -363,7 +332,7 @@ resource "google_pubsub_subscription" "import_automation_sub" { filter = "attributes.transfer_status=\"TRANSFER_COMPLETED\"" push_config { - push_endpoint = google_cloud_run_v2_service.import_helper.uri + push_endpoint = google_cloud_run_v2_service.ingestion_helper.uri oidc_token { service_account_email = google_service_account.automation_sa.email } diff --git a/pipeline/workflow/cloudbuild.yaml b/pipeline/workflow/cloudbuild.yaml index 9ec54f863..3f7390552 100644 --- a/pipeline/workflow/cloudbuild.yaml +++ b/pipeline/workflow/cloudbuild.yaml @@ -41,11 +41,6 @@ steps: args: ['builds', 'submit', 'ingestion-helper', '--config', 'ingestion-helper/cloudbuild.yaml', '--substitutions', '_AR_REPO_URL=${_AR_REPO_URL},_VERSION=${_VERSION}'] dir: 'pipeline/workflow' -- id: 'build-import-helper' - name: 'gcr.io/cloud-builders/gcloud' - args: ['builds', 'submit', 'import-helper', '--config', 'import-helper/cloudbuild.yaml', '--substitutions', '_AR_REPO_URL=${_AR_REPO_URL},_VERSION=${_VERSION}'] - dir: 'pipeline/workflow' - - id: 'build-dataflow-template' name: 'gcr.io/cloud-builders/gcloud' args: diff --git a/pipeline/workflow/deploy-services.yaml b/pipeline/workflow/deploy-services.yaml index 8316796bd..930a13d33 100644 --- a/pipeline/workflow/deploy-services.yaml +++ b/pipeline/workflow/deploy-services.yaml @@ -37,10 +37,6 @@ steps: name: 'gcr.io/cloud-builders/gcloud' args: ['run', 'deploy', 'ingestion-helper-service', '--image', '${_AR_REPO_URL}/datacommons-ingestion-helper:${_VERSION}', '--region', '${_LOCATION}', '--project', '${_PROJECT_ID}', '--no-allow-unauthenticated', '--timeout', '60m', '--set-env-vars', 'PROJECT_ID=${_PROJECT_ID},LOCATION=${_LOCATION},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID},SPANNER_GRAPH_DATABASE_ID=${_SPANNER_GRAPH_DATABASE_ID},GCS_BUCKET_ID=${_GCS_BUCKET_ID},BQ_DATASET_ID=${_BQ_DATASET_ID},BQ_SPANNER_CONN_ID=${_BQ_SPANNER_CONN_ID}'] -- id: 'import-helper-service' - name: 'gcr.io/cloud-builders/gcloud' - args: ['run', 'deploy', 'import-helper-service', '--image', '${_AR_REPO_URL}/datacommons-import-helper:${_VERSION}', '--region', '${_LOCATION}', '--project', '${_PROJECT_ID}', '--no-allow-unauthenticated', '--set-env-vars', 'PROJECT_ID=${_PROJECT_ID},LOCATION=${_LOCATION},PROJECT_NUMBER=${_PROJECT_NUMBER},GCS_BUCKET_ID=${_GCS_BUCKET_ID}'] - - id: 'import-automation-workflow' name: 'gcr.io/cloud-builders/gcloud' args: ['workflows', 'deploy', 'import-automation-workflow', '--project', '${_PROJECT_ID}', '--location', '${_LOCATION}', '--source', 'import-automation-workflow.yaml', '--set-env-vars', 'LOCATION=${_LOCATION},GCS_BUCKET_ID=${_GCS_BUCKET_ID},GCS_MOUNT_BUCKET=${_GCS_MOUNT_BUCKET},PROJECT_NUMBER=${_PROJECT_NUMBER}'] diff --git a/pipeline/workflow/import-helper/Dockerfile b/pipeline/workflow/import-helper/Dockerfile deleted file mode 100644 index 2473221b8..000000000 --- a/pipeline/workflow/import-helper/Dockerfile +++ /dev/null @@ -1,32 +0,0 @@ -# Copyright 2026 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -FROM python:3.12-slim - -# Copy uv binary -COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/ - -# Allow statements and log messages to immediately appear in the logs -ENV PYTHONUNBUFFERED True - -WORKDIR /app - -# Copy local code to the container image. -COPY . . - -# Install production dependencies using uv. -RUN uv pip install --system --no-cache -r requirements.txt - -# Run the functions framework -CMD ["functions-framework", "--target", "handle_feed_event"] diff --git a/pipeline/workflow/import-helper/cloudbuild.yaml b/pipeline/workflow/import-helper/cloudbuild.yaml deleted file mode 100644 index 7f8c208ac..000000000 --- a/pipeline/workflow/import-helper/cloudbuild.yaml +++ /dev/null @@ -1,30 +0,0 @@ -# Copyright 2026 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -steps: - # Build the container image - - name: 'gcr.io/cloud-builders/docker' - args: ['build', '-t', '${_AR_REPO_URL}/${_IMAGE_NAME}:${_VERSION}', '.'] - - # Push the container image - - name: 'gcr.io/cloud-builders/docker' - args: ['push', '${_AR_REPO_URL}/${_IMAGE_NAME}:${_VERSION}'] - -substitutions: - _AR_REPO_URL: 'us-docker.pkg.dev/datcom-ci/gcr.io' - _IMAGE_NAME: 'datacommons-import-helper' - _VERSION: 'latest' - -images: - - '${_AR_REPO_URL}/${_IMAGE_NAME}:${_VERSION}' diff --git a/pipeline/workflow/import-helper/main.py b/pipeline/workflow/import-helper/main.py deleted file mode 100644 index c825ec2bf..000000000 --- a/pipeline/workflow/import-helper/main.py +++ /dev/null @@ -1,67 +0,0 @@ -# Copyright 2026 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import functions_framework -import logging -from datetime import datetime, timezone -import import_helper as helper - -logging.getLogger().setLevel(logging.INFO) - - -# Triggered from a message on a Cloud Pub/Sub topic. -@functions_framework.http -def handle_feed_event(request): - # Updates status in spanner and triggers ingestion workflow - # for an import using CDA feed - message = helper.parse_message(request) - if not message: - return 'Invalid Pub/Sub message format', 400 - - attributes = message.get('attributes', {}) - message_id = message.get('messageId', '') - if attributes.get('transfer_status') != 'TRANSFER_COMPLETED': - return 'OK', 200 - - duplicate = helper.check_duplicate(message_id) - if duplicate: - logging.info(f"Message {message_id} already processed. Skipping.") - return 'OK', 200 - - import_name = attributes.get('import_name') - latest_version = attributes.get( - 'import_version', - datetime.now(timezone.utc).strftime("%Y-%m-%d")) - import_step = attributes.get('import_step', '') - graph_path = attributes.get('graph_path', "/**/*.mcf*") - import_size = attributes.get('import_size', '') - cron_schedule = attributes.get('cron_schedule', '') - if import_step == 'ingestion_workflow_single' or import_step == 'ingestion_workflow_batch': - import_status = 'STAGING' - job_id = attributes.get('feed_name', 'cda_feed') - helper.update_import_status(import_name, import_status, latest_version, - graph_path, job_id, cron_schedule) - if import_step == 'ingestion_workflow_single': - # Invoke ingestion workflow to trigger dataflow job - helper.invoke_spanner_ingestion_workflow(import_name) - elif import_step == 'import_automation_job' or import_step == 'import_automation_e2e': - # Invoke batch import job and optionally ingestion workflow to trigger dataflow job - run_ingestion = True if import_step == 'import_automation_e2e' else False - helper.invoke_import_automation_workflow(import_name, latest_version, - import_size, graph_path, - cron_schedule, run_ingestion) - else: - logging.info(f"Skipping import post processing.") - - return 'OK', 200 diff --git a/pipeline/workflow/import-helper/requirements.txt b/pipeline/workflow/import-helper/requirements.txt deleted file mode 100644 index 9d321e815..000000000 --- a/pipeline/workflow/import-helper/requirements.txt +++ /dev/null @@ -1,6 +0,0 @@ -functions-framework==3.* -google-cloud-workflows -google-auth -requests -google-cloud-storage -croniter diff --git a/pipeline/workflow/import-helper/import_helper.py b/pipeline/workflow/ingestion-helper/feed_event_utils.py similarity index 51% rename from pipeline/workflow/import-helper/import_helper.py rename to pipeline/workflow/ingestion-helper/feed_event_utils.py index 50aeaf084..ecc32f31f 100644 --- a/pipeline/workflow/import-helper/import_helper.py +++ b/pipeline/workflow/ingestion-helper/feed_event_utils.py @@ -18,34 +18,29 @@ import os import croniter from datetime import datetime, timezone -from google.auth.transport.requests import Request -from google.oauth2 import id_token from google.cloud import storage from google.cloud.workflows import executions_v1 -import requests +import import_utils logging.getLogger().setLevel(logging.INFO) -PROJECT_ID = os.environ.get('PROJECT_ID') -PROJECT_NUMBER = os.environ.get('PROJECT_NUMBER') -LOCATION = os.environ.get('LOCATION') -GCS_BUCKET_ID = os.environ.get('GCS_BUCKET_ID') -INGESTION_HELPER_URL = f"https://ingestion-helper-service-{PROJECT_NUMBER}.{LOCATION}.run.app" SPANNER_INGESTION_WORKFLOW_ID = 'spanner-ingestion-workflow' IMPORT_AUTOMATION_WORKFLOW_ID = 'import-automation-workflow' -def invoke_spanner_ingestion_workflow(import_name: str): +def invoke_spanner_ingestion_workflow(project_id: str, location: str, import_name: str): """Triggers the spanner ingestion workflow. Args: + project_id: GCP project ID. + location: Workflow location. import_name: The name of the import. """ workflow_args = {"importList": [import_name.split(':')[-1]]} logging.info(f"Invoking {SPANNER_INGESTION_WORKFLOW_ID} for {import_name}") execution_client = executions_v1.ExecutionsClient() - parent = f"projects/{PROJECT_ID}/locations/{LOCATION}/workflows/{SPANNER_INGESTION_WORKFLOW_ID}" + parent = f"projects/{project_id}/locations/{location}/workflows/{SPANNER_INGESTION_WORKFLOW_ID}" execution_req = executions_v1.Execution(argument=json.dumps(workflow_args)) response = execution_client.create_execution(parent=parent, execution=execution_req) @@ -54,14 +49,19 @@ def invoke_spanner_ingestion_workflow(import_name: str): ) -def invoke_import_automation_workflow(import_name: str, +def invoke_import_automation_workflow(project_id: str, + location: str, + import_name: str, latest_version: str, import_size: str, - graph_path: str, cron_schedule: str, + graph_path: str, + cron_schedule: str, run_ingestion: bool = False): """Triggers the import automation workflow. Args: + project_id: GCP project ID. + location: Workflow location. import_name: The name of the import. latest_version: The version of the import. import_size: The size of the import ('small', 'medium', 'large'). @@ -91,7 +91,7 @@ def invoke_import_automation_workflow(import_name: str, logging.info(f"Invoking {IMPORT_AUTOMATION_WORKFLOW_ID} for {import_name}") execution_client = executions_v1.ExecutionsClient() - parent = f"projects/{PROJECT_ID}/locations/{LOCATION}/workflows/{IMPORT_AUTOMATION_WORKFLOW_ID}" + parent = f"projects/{project_id}/locations/{location}/workflows/{IMPORT_AUTOMATION_WORKFLOW_ID}" execution_req = executions_v1.Execution(argument=json.dumps(workflow_args)) response = execution_client.create_execution(parent=parent, execution=execution_req) @@ -100,64 +100,60 @@ def invoke_import_automation_workflow(import_name: str, ) -def update_import_status(import_name, - import_status, - import_version, - graph_path, - job_id, - cron_schedule=None): - """Updates the status for the specified import job. +def calculate_next_refresh(cron_schedule: str) -> str: + """Calculates next refresh ISO timestamp from cron schedule. Args: - import_name: The name of the import. - import_status: The new status of the import. - import_version: The version of the import. - graph_path: The graph path for the import. - job_id: The job ID associated with the import. - cron_schedule: The cron schedule for the import (optional). + cron_schedule: The cron schedule for the import. + + Returns: + The next refresh timestamp as an ISO-8601 string, or current UTC timestamp if error. """ - logging.info(f"Updating {import_name} status: {import_status}") - latest_version = 'gs://' + GCS_BUCKET_ID + '/' + import_name.replace( - ':', '/') + '/' + import_version - request = { - 'actionType': 'update_import_status', - 'importName': import_name, - 'status': import_status, - 'job_id': job_id, - 'latestVersion': latest_version, - 'graphPath': graph_path - } if cron_schedule: try: - next_refresh = croniter.croniter( + return croniter.croniter( cron_schedule, datetime.now(timezone.utc)).get_next(datetime).isoformat() - request['nextRefresh'] = next_refresh - except (croniter.CroniterError) as e: + except Exception as e: logging.error( f"Error calculating next refresh from schedule '{cron_schedule}': {e}" ) - logging.info(f"Update request: {request}") - auth_req = Request() - token = id_token.fetch_id_token(auth_req, INGESTION_HELPER_URL) - headers = {'Authorization': f'Bearer {token}'} - response = requests.post(INGESTION_HELPER_URL, - json=request, - headers=headers) - response.raise_for_status() - logging.info(f"Updated status for {import_name}") - - -def parse_message(request) -> dict: + return datetime.now(timezone.utc).isoformat() + + +def check_duplicate(message_id: str, gcs_bucket_id: str) -> bool: + """Checks for duplicate messages using a GCS file. + + Args: + message_id: The ID of the message to check. + gcs_bucket_id: The GCS bucket ID to check against. + + Returns: + True if the message is a duplicate, False otherwise. + """ + duplicate = False + if not message_id: + return duplicate + logging.info(f"Checking for existing message: {message_id}") + storage_client = storage.Client() + bucket = storage_client.bucket(gcs_bucket_id) + blob = bucket.blob(f"google3/transfers/{message_id}") + try: + blob.upload_from_string("", if_generation_match=0) + except Exception: + duplicate = True + return duplicate + + +def parse_message(request_json: dict) -> dict: """Processes the incoming Pub/Sub message. Args: - request: The flask request object. + request_json: The request JSON dictionary. Returns: A dictionary containing the message data, or None if invalid. """ - request_json = request.get_json(silent=True) if not request_json or 'message' not in request_json: logging.error('Invalid Pub/Sub message format') return None @@ -165,7 +161,7 @@ def parse_message(request) -> dict: pubsub_message = request_json['message'] logging.info(f"Received Pub/Sub message: {pubsub_message}") try: - data_bytes = base64.b64decode(pubsub_message["data"]) + data_bytes = base64.b64decode(pubsub_message.get("data", "")) notification_json = data_bytes.decode("utf-8") logging.info(f"Notification content: {notification_json}") except Exception as e: @@ -174,24 +170,80 @@ def parse_message(request) -> dict: return pubsub_message -def check_duplicate(message_id: str): - """Checks for duplicate messages using a GCS file. +def handle_feed_event(request_json: dict, + spanner, + storage, + project_id: str, + location: str, + gcs_bucket_id: str, + is_base_dc: bool) -> tuple: + """Handles GCS CDA Pub/Sub feed events and invokes workflows. Args: - message_id: The ID of the message to check. + request_json: The incoming Pub/Sub JSON payload. + spanner: Instantiated SpannerClient object. + storage: Instantiated StorageClient object. + project_id: GCP Project ID. + location: Cloud location. + gcs_bucket_id: The GCS bucket ID. + is_base_dc: Flag indicating if this is base DC. Returns: - True if the message is a duplicate, False otherwise. + A tuple of (response_message, HTTP_status_code). """ - duplicate = False - if not message_id: - return duplicate - logging.info(f"Checking for existing message: {message_id}") - storage_client = storage.Client() - bucket = storage_client.bucket(GCS_BUCKET_ID) - blob = bucket.blob(f"google3/transfers/{message_id}") - try: - blob.upload_from_string("", if_generation_match=0) - except Exception: - duplicate = True - return duplicate + message = parse_message(request_json) + if not message: + return 'Invalid Pub/Sub message format', 400 + + attributes = message.get('attributes', {}) + message_id = message.get('messageId', '') + if attributes.get('transfer_status') != 'TRANSFER_COMPLETED': + return 'OK', 200 + + duplicate = check_duplicate(message_id, gcs_bucket_id) + if duplicate: + logging.info(f"Message {message_id} already processed. Skipping.") + return 'OK', 200 + + import_name = attributes.get('import_name') + latest_version = attributes.get( + 'import_version', + datetime.now(timezone.utc).strftime("%Y-%m-%d")) + import_step = attributes.get('import_step', '') + graph_path = attributes.get('graph_path', "/**/*.mcf*") + import_size = attributes.get('import_size', '') + cron_schedule = attributes.get('cron_schedule', '') + + if import_step in ('ingestion_workflow_single', 'ingestion_workflow_batch'): + import_status = 'STAGING' + job_id = attributes.get('feed_name', 'cda_feed') + + # Build request dict to update status internally + latest_version_gcs = 'gs://' + gcs_bucket_id + '/' + import_name.replace( + ':', '/') + '/' + latest_version + update_request = { + 'importName': import_name, + 'status': import_status, + 'jobId': job_id, + 'latestVersion': latest_version_gcs, + 'graphPath': graph_path, + } + if cron_schedule: + update_request['nextRefresh'] = calculate_next_refresh(cron_schedule) + + import_utils.handle_update_import_status( + update_request, spanner, storage, is_base_dc, project_id, location + ) + + if import_step == 'ingestion_workflow_single': + invoke_spanner_ingestion_workflow(project_id, location, import_name) + + elif import_step in ('import_automation_job', 'import_automation_e2e'): + run_ingestion = True if import_step == 'import_automation_e2e' else False + invoke_import_automation_workflow(project_id, location, import_name, + latest_version, import_size, + graph_path, cron_schedule, run_ingestion) + else: + logging.info("Skipping import post processing.") + + return 'OK', 200 diff --git a/pipeline/workflow/ingestion-helper/import_utils.py b/pipeline/workflow/ingestion-helper/import_utils.py index 33f9d1fac..14a393357 100644 --- a/pipeline/workflow/ingestion-helper/import_utils.py +++ b/pipeline/workflow/ingestion-helper/import_utils.py @@ -14,6 +14,7 @@ """Utility functions for the ingestion helper.""" import logging +import os import re from datetime import datetime, timezone from googleapiclient.discovery import build @@ -173,3 +174,43 @@ def get_ingestion_metrics(project_id, location, job_id): 'edge_count': edge_count, 'execution_time': execution_time } + + +def handle_update_import_status(request_json: dict, + spanner, + storage, + is_base_dc: bool, + project_id: str, + location: str): + """Updates the status of a specific import job. + + Args: + request_json: The request payload containing import details. + spanner: Instantiated SpannerClient object. + storage: Instantiated StorageClient object. + is_base_dc: Flag indicating if this is base DC. + project_id: GCP project ID. + location: Location of scheduler / services. + """ + import_name = request_json['importName'] + status = request_json['status'] + logging.info(f'Updating import {import_name} to status {status}') + params = get_import_params(request_json) + next_refresh = None + if is_base_dc: + next_refresh = get_next_refresh(project_id, location, import_name) + + if next_refresh: + params['next_refresh'] = next_refresh + + if status == 'STAGING': + version = os.path.basename(request_json.get('latestVersion', '')) + if not version: + raise ValueError(f'Empty version for import {import_name}') + storage.update_version_file(import_name, version, is_staging=True) + storage.update_provenance_file(import_name, version) + storage.update_import_summary(params) + storage.update_version_file(import_name, version, is_staging=False) + comment = f"import-workflow:{request_json.get('jobId','')}" + spanner.update_version_history(import_name, version, comment) + spanner.update_import_status(params) diff --git a/pipeline/workflow/ingestion-helper/main.py b/pipeline/workflow/ingestion-helper/main.py index e3b0eac57..e53b26232 100644 --- a/pipeline/workflow/ingestion-helper/main.py +++ b/pipeline/workflow/ingestion-helper/main.py @@ -8,6 +8,7 @@ import import_utils from flask import jsonify from aggregation_utils import AggregationUtils +import feed_event_utils logging.getLogger().setLevel(logging.INFO) @@ -69,11 +70,6 @@ def ingestion_helper(request): if not request_json: return ('Request is not a valid JSON', 400) - validation_error = _validate_params(request_json, ['actionType']) - if validation_error: - return (validation_error, 400) - - action_type = request_json['actionType'] spanner = SpannerClient(FLAGS.spanner_project_id, FLAGS.spanner_instance_id, FLAGS.spanner_database_id, @@ -83,6 +79,24 @@ def ingestion_helper(request): 'text-embedding-005')) storage = StorageClient(FLAGS.gcs_bucket_id) + # Route Pub/Sub messages (CDA Feed Events) to feed_event_utils + if 'message' in request_json: + return feed_event_utils.handle_feed_event( + request_json=request_json, + spanner=spanner, + storage=storage, + project_id=FLAGS.project_id, + location=FLAGS.location, + gcs_bucket_id=FLAGS.gcs_bucket_id, + is_base_dc=FLAGS.is_base_dc + ) + + validation_error = _validate_params(request_json, ['actionType']) + if validation_error: + return (validation_error, 400) + + action_type = request_json['actionType'] + if action_type == 'get_import_info': # Gets the details of imports that are ready for ingestion. # Input: diff --git a/pipeline/workflow/ingestion-helper/main_test.py b/pipeline/workflow/ingestion-helper/main_test.py index 69c87ebfa..3e177f4fc 100644 --- a/pipeline/workflow/ingestion-helper/main_test.py +++ b/pipeline/workflow/ingestion-helper/main_test.py @@ -141,6 +141,31 @@ def test_seed_database_success(self, mock_spanner_client_class): self.assertIn("OK", response) mock_spanner_client.seed_database.assert_called_once() + @patch.dict(os.environ, { + "SPANNER_INSTANCE_ID": "test-instance", + "SPANNER_DATABASE_ID": "test-db", + "SPANNER_PROJECT_ID": "test-proj" + }) + @patch('main.SpannerClient') + @patch('main.StorageClient') + @patch('main.feed_event_utils.handle_feed_event', return_value=('OK', 200)) + def test_feed_event_routing(self, mock_handle, mock_storage_class, mock_spanner_class): + mock_request = MagicMock() + mock_request.get_json.return_value = { + "message": { + "attributes": { + "transfer_status": "TRANSFER_COMPLETED", + }, + "messageId": "msg123" + } + } + + response, status_code = main.ingestion_helper(mock_request) + + self.assertEqual(status_code, 200) + self.assertEqual(response, "OK") + mock_handle.assert_called_once() + if __name__ == '__main__': unittest.main() diff --git a/pipeline/workflow/ingestion-helper/pyproject.toml b/pipeline/workflow/ingestion-helper/pyproject.toml index 7abcac136..c7d06c5fe 100644 --- a/pipeline/workflow/ingestion-helper/pyproject.toml +++ b/pipeline/workflow/ingestion-helper/pyproject.toml @@ -30,6 +30,8 @@ dependencies = [ "absl-py", "google-cloud-bigquery", "redis", + "google-cloud-workflows", + "croniter", ] [tool.hatch.build.targets.wheel] diff --git a/pipeline/workflow/tag-prod.yaml b/pipeline/workflow/tag-prod.yaml index cc8b8db0a..5a175121d 100644 --- a/pipeline/workflow/tag-prod.yaml +++ b/pipeline/workflow/tag-prod.yaml @@ -23,10 +23,6 @@ steps: name: 'gcr.io/cloud-builders/gcloud' args: ['container', 'images', 'add-tag', '${_AR_REPO_URL}/datacommons-ingestion-helper:${_VERSION}', '${_AR_REPO_URL}/datacommons-ingestion-helper:${_PROD_TAG}', '--quiet'] -- id: 'tag-import-helper' - name: 'gcr.io/cloud-builders/gcloud' - args: ['container', 'images', 'add-tag', '${_AR_REPO_URL}/datacommons-import-helper:${_VERSION}', '${_AR_REPO_URL}/datacommons-import-helper:${_PROD_TAG}', '--quiet'] - - id: 'update-dataflow-template' name: 'gcr.io/google.com/cloudsdktool/cloud-sdk' entrypoint: 'gsutil'