From 8a90bc4c782f4ac67ce7f7ee8ccf411b18db0f9c Mon Sep 17 00:00:00 2001 From: Vishal Gupta Date: Thu, 14 May 2026 05:35:03 +0000 Subject: [PATCH] Add config to disable skip status --- import-automation/executor/app/configs.py | 2 ++ .../executor/app/executor/import_executor.py | 2 +- .../workflow/ingestion-helper/main.py | 22 +++++++++---------- .../manifest.json | 5 ++++- 4 files changed, 18 insertions(+), 13 deletions(-) diff --git a/import-automation/executor/app/configs.py b/import-automation/executor/app/configs.py index 5fda8ae26f..de49bc80e4 100644 --- a/import-automation/executor/app/configs.py +++ b/import-automation/executor/app/configs.py @@ -169,6 +169,8 @@ class ExecutorConfig: email_token: str = '' # Email alerts are disabled by default. Cloud Run jobs use GCP alerting. disable_email_notifications: bool = True + # Allow skipping ingestion for no data change. + enable_skip_status: bool = True # Skip uploading the data to GCS (for local testing). skip_gcs_upload: bool = False # Skip uploading input files to GCS. diff --git a/import-automation/executor/app/executor/import_executor.py b/import-automation/executor/app/executor/import_executor.py index 60eb0c7b42..35891878c6 100644 --- a/import-automation/executor/app/executor/import_executor.py +++ b/import-automation/executor/app/executor/import_executor.py @@ -721,7 +721,7 @@ def _invoke_import_validation(self, repo_dir: str, relative_import_dir: str, logging.error( "Marking import as VALIDATION due to validation failure.") import_summary.status = ImportStatus.VALIDATION - elif not differ_status: + elif self.config.enable_skip_status and not differ_status: logging.info("Marking import as SKIP due to no data diff.") import_summary.status = ImportStatus.SKIP else: diff --git a/import-automation/workflow/ingestion-helper/main.py b/import-automation/workflow/ingestion-helper/main.py index b0159a3d5c..9921f0fd34 100644 --- a/import-automation/workflow/ingestion-helper/main.py +++ b/import-automation/workflow/ingestion-helper/main.py @@ -63,7 +63,7 @@ def ingestion_helper(request): if validation_error: return (validation_error, 400) - actionType = request_json['actionType'] + action_type = request_json['actionType'] spanner = SpannerClient(FLAGS.spanner_project_id, FLAGS.spanner_instance_id, FLAGS.spanner_database_id, @@ -73,7 +73,7 @@ def ingestion_helper(request): 'text-embedding-005')) storage = StorageClient(FLAGS.gcs_bucket_id) - if actionType == 'get_import_info': + if action_type == 'get_import_info': # Gets the details of imports that are ready for ingestion. # Input: # importList: list of import names to ingest (optional) @@ -81,7 +81,7 @@ def ingestion_helper(request): import_info = spanner.get_import_info(import_list) return jsonify(import_info) - elif actionType == 'acquire_ingestion_lock': + elif action_type == 'acquire_ingestion_lock': # Attempts to acquire the global lock for ingestion. # Input: # workflowId: ID of the workflow acquiring the lock @@ -97,7 +97,7 @@ def ingestion_helper(request): return ('Failed to acquire lock', 500) return ('OK', 200) - elif actionType == 'release_ingestion_lock': + elif action_type == 'release_ingestion_lock': # Releases the global ingestion lock. # Input: # workflowId: ID of the workflow releasing the lock @@ -110,7 +110,7 @@ def ingestion_helper(request): return ('Failed to release lock', 500) return ('OK', 200) - elif actionType == 'update_ingestion_status': + elif action_type == 'update_ingestion_status': # Updates the status of imports after ingestion. # Input: # importList: list of import names @@ -136,7 +136,7 @@ def ingestion_helper(request): spanner.update_import_version_history(import_list, workflow_id) return ('OK', 200) - elif actionType == 'update_import_status': + elif action_type == 'update_import_status': # Updates the status of a specific import job. # Input: # importName: name of the import @@ -173,7 +173,7 @@ def ingestion_helper(request): spanner.update_import_status(params) return ('OK', 200) - elif actionType == 'update_import_version': + elif action_type == 'update_import_version': # Updates the version and status of an import. # Input: # importName: name of the import @@ -212,14 +212,14 @@ def ingestion_helper(request): f"OK [Import: {import_name} Version: {version} Status: {params['status']}]", 200) - elif actionType == 'initialize_database': + elif action_type == 'initialize_database': # Initializes the database by creating all required tables and proto bundles. logging.info("Action: initialize_database") enable_embeddings = request_json.get('enableEmbeddings', FLAGS.enable_embeddings) spanner.initialize_database(enable_embeddings=enable_embeddings) return ('OK', 200) - elif actionType == 'embedding_ingestion': + elif action_type == 'embedding_ingestion': logging.info("Action: embedding_ingestion") enable_embeddings = request_json.get('enableEmbeddings', FLAGS.enable_embeddings) @@ -238,7 +238,7 @@ def ingestion_helper(request): except Exception as e: logging.error(f"Embedding ingestion failed: {e}") return (f"Error: {e}", 500) - elif actionType == 'run_aggregation': + elif action_type == 'run_aggregation': # Runs aggregation logic for the specified imports. # Input: # importList: list of imports to aggregate @@ -253,4 +253,4 @@ def ingestion_helper(request): return (f"Aggregation failed: {str(e)}", 500) else: - return (f'Unknown actionType: {actionType}', 400) + return (f'Unknown actionType: {action_type}', 400) diff --git a/scripts/us_fed/treasury_constant_maturity_rates/manifest.json b/scripts/us_fed/treasury_constant_maturity_rates/manifest.json index 009009b94c..4740a36f5a 100644 --- a/scripts/us_fed/treasury_constant_maturity_rates/manifest.json +++ b/scripts/us_fed/treasury_constant_maturity_rates/manifest.json @@ -20,7 +20,10 @@ "source_files": [ "treasury_constant_maturity_rates.csv" ], - "cron_schedule": "15 3 * * *" + "cron_schedule": "15 3 * * *", + "config_override": { + "enable_skip_status": false + } }, { "import_name": "USFed_ConstantMaturityRates",