From 2777fc4e4deb2314bf10bbacc40003bfecc0d367 Mon Sep 17 00:00:00 2001 From: Vishal Gupta Date: Fri, 5 Jun 2026 17:06:22 +0000 Subject: [PATCH] Bug fixes --- pipeline/workflow/cloudbuild.yaml | 3 +-- pipeline/workflow/deploy-services.yaml | 4 ++-- pipeline/workflow/ingestion-helper/main.py | 6 +++++- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/pipeline/workflow/cloudbuild.yaml b/pipeline/workflow/cloudbuild.yaml index 9ec54f86..500f1616 100644 --- a/pipeline/workflow/cloudbuild.yaml +++ b/pipeline/workflow/cloudbuild.yaml @@ -30,7 +30,6 @@ substitutions: _AR_REPO_URL: 'us-docker.pkg.dev/datcom-ci/gcr.io' _BQ_SPANNER_CONN_ID: 'projects/datcom-ci/locations/us-central1/connections/bq_spanner_conn_test' _VERSION: '${SHORT_SHA}' - _DATAFLOW_TEMPLATE_PATH: 'gs://datcom-templates/templates/flex/ingestion-${_VERSION}.json' _PROD_TAG: 'stable' steps: @@ -65,7 +64,7 @@ steps: - '.' - '--config=deploy-services.yaml' - '--project=${_PROJECT_ID}' - - '--substitutions=_PROJECT_ID=${_PROJECT_ID},_SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},_SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},_SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID},_SPANNER_GRAPH_DATABASE_ID=${_SPANNER_GRAPH_DATABASE_ID},_GCS_BUCKET_ID=${_GCS_BUCKET_ID},_LOCATION=${_LOCATION},_GCS_MOUNT_BUCKET=${_GCS_MOUNT_BUCKET},_BQ_DATASET_ID=${_BQ_DATASET_ID},_PROJECT_NUMBER=${_PROJECT_NUMBER},_BQ_SPANNER_CONN_ID=${_BQ_SPANNER_CONN_ID},_VERSION=${_VERSION},_DATAFLOW_TEMPLATE_PATH=${_DATAFLOW_TEMPLATE_PATH}' + - '--substitutions=_PROJECT_ID=${_PROJECT_ID},_SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},_SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},_SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID},_SPANNER_GRAPH_DATABASE_ID=${_SPANNER_GRAPH_DATABASE_ID},_GCS_BUCKET_ID=${_GCS_BUCKET_ID},_LOCATION=${_LOCATION},_GCS_MOUNT_BUCKET=${_GCS_MOUNT_BUCKET},_BQ_DATASET_ID=${_BQ_DATASET_ID},_PROJECT_NUMBER=${_PROJECT_NUMBER},_BQ_SPANNER_CONN_ID=${_BQ_SPANNER_CONN_ID},_VERSION=${_VERSION}' dir: 'pipeline/workflow' # 2. Run E2E Tests on Staging diff --git a/pipeline/workflow/deploy-services.yaml b/pipeline/workflow/deploy-services.yaml index 8316796b..a5720c37 100644 --- a/pipeline/workflow/deploy-services.yaml +++ b/pipeline/workflow/deploy-services.yaml @@ -30,7 +30,7 @@ substitutions: _AR_REPO_URL: 'us-docker.pkg.dev/datcom-ci/gcr.io' _BQ_SPANNER_CONN_ID: 'projects/datcom-import-automation-prod/locations/us/connections/bq_spanner_conn' _VERSION: '${SHORT_SHA}' - _DATAFLOW_TEMPLATE_PATH: 'gs://datcom-templates/templates/flex/ingestion-${_VERSION}.json' + _DATAFLOW_TEMPLATE_PATH: 'gs://datcom-templates/templates/flex/' steps: - id: 'ingestion-helper-service' @@ -47,7 +47,7 @@ steps: - id: 'spanner-ingestion-workflow' name: 'gcr.io/cloud-builders/gcloud' - args: ['workflows', 'deploy', 'spanner-ingestion-workflow', '--project', '${_PROJECT_ID}', '--location', '${_LOCATION}', '--source', 'spanner-ingestion-workflow.yaml', '--set-env-vars', 'LOCATION=${_LOCATION},PROJECT_ID=${_PROJECT_ID},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_GRAPH_DATABASE_ID},PROJECT_NUMBER=${_PROJECT_NUMBER},DATAFLOW_TEMPLATE_PATH=${_DATAFLOW_TEMPLATE_PATH}'] + args: ['workflows', 'deploy', 'spanner-ingestion-workflow', '--project', '${_PROJECT_ID}', '--location', '${_LOCATION}', '--source', 'spanner-ingestion-workflow.yaml', '--set-env-vars', 'LOCATION=${_LOCATION},PROJECT_ID=${_PROJECT_ID},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_GRAPH_DATABASE_ID},PROJECT_NUMBER=${_PROJECT_NUMBER},DATAFLOW_TEMPLATE_PATH=${_DATAFLOW_TEMPLATE_PATH}ingestion-${_VERSION}.json'] options: logging: CLOUD_LOGGING_ONLY diff --git a/pipeline/workflow/ingestion-helper/main.py b/pipeline/workflow/ingestion-helper/main.py index e3b0eac5..5b60514b 100644 --- a/pipeline/workflow/ingestion-helper/main.py +++ b/pipeline/workflow/ingestion-helper/main.py @@ -259,6 +259,9 @@ def ingestion_helper(request): # Input: # importList: list of imports to aggregate import_list = request_json.get('importList', []) + if not import_list: + logging.info("Empty import list. Skipping aggregation.") + return jsonify({'status': 'SUBMITTED', 'jobIds': []}), 200 # Validate required flags are not empty or None missing_flags = [] @@ -312,7 +315,8 @@ def ingestion_helper(request): # jobIds: list of BigQuery job IDs job_ids = request_json.get('jobIds', []) if not job_ids: - return ('Missing or empty jobIds', 400) + logging.info("Empty jobIds. Returning status DONE.") + return jsonify({'status': 'DONE'}), 200 aggregation = AggregationUtils( connection_id=FLAGS.spanner_connection_id,