Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions import-automation/executor/app/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ class ExecutorConfig:
email_token: str = ''
# Email alerts are disabled by default. Cloud Run jobs use GCP alerting.
disable_email_notifications: bool = True
# Allow skipping ingestion for no data change.
enable_skip_status: bool = True
Comment thread
vish-cs marked this conversation as resolved.
# Skip uploading the data to GCS (for local testing).
skip_gcs_upload: bool = False
# Skip uploading input files to GCS.
Expand Down
2 changes: 1 addition & 1 deletion import-automation/executor/app/executor/import_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ def _invoke_import_validation(self, repo_dir: str, relative_import_dir: str,
logging.error(
"Marking import as VALIDATION due to validation failure.")
import_summary.status = ImportStatus.VALIDATION
elif not differ_status:
elif self.config.enable_skip_status and not differ_status:
Comment thread
vish-cs marked this conversation as resolved.
logging.info("Marking import as SKIP due to no data diff.")
Comment thread
vish-cs marked this conversation as resolved.
import_summary.status = ImportStatus.SKIP
else:
Expand Down
22 changes: 11 additions & 11 deletions import-automation/workflow/ingestion-helper/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def ingestion_helper(request):
if validation_error:
return (validation_error, 400)

actionType = request_json['actionType']
action_type = request_json['actionType']
spanner = SpannerClient(FLAGS.spanner_project_id,
FLAGS.spanner_instance_id,
FLAGS.spanner_database_id,
Expand All @@ -73,15 +73,15 @@ def ingestion_helper(request):
'text-embedding-005'))
storage = StorageClient(FLAGS.gcs_bucket_id)

if actionType == 'get_import_info':
if action_type == 'get_import_info':
# Gets the details of imports that are ready for ingestion.
# Input:
# importList: list of import names to ingest (optional)
import_list = request_json.get('importList', [])
import_info = spanner.get_import_info(import_list)
return jsonify(import_info)

elif actionType == 'acquire_ingestion_lock':
elif action_type == 'acquire_ingestion_lock':
# Attempts to acquire the global lock for ingestion.
# Input:
# workflowId: ID of the workflow acquiring the lock
Expand All @@ -97,7 +97,7 @@ def ingestion_helper(request):
return ('Failed to acquire lock', 500)
return ('OK', 200)

elif actionType == 'release_ingestion_lock':
elif action_type == 'release_ingestion_lock':
# Releases the global ingestion lock.
# Input:
# workflowId: ID of the workflow releasing the lock
Expand All @@ -110,7 +110,7 @@ def ingestion_helper(request):
return ('Failed to release lock', 500)
return ('OK', 200)

elif actionType == 'update_ingestion_status':
elif action_type == 'update_ingestion_status':
# Updates the status of imports after ingestion.
# Input:
# importList: list of import names
Expand All @@ -136,7 +136,7 @@ def ingestion_helper(request):
spanner.update_import_version_history(import_list, workflow_id)
return ('OK', 200)

elif actionType == 'update_import_status':
elif action_type == 'update_import_status':
# Updates the status of a specific import job.
# Input:
# importName: name of the import
Expand Down Expand Up @@ -173,7 +173,7 @@ def ingestion_helper(request):
spanner.update_import_status(params)
return ('OK', 200)

elif actionType == 'update_import_version':
elif action_type == 'update_import_version':
# Updates the version and status of an import.
# Input:
# importName: name of the import
Expand Down Expand Up @@ -212,14 +212,14 @@ def ingestion_helper(request):
f"OK [Import: {import_name} Version: {version} Status: {params['status']}]",
200)

elif actionType == 'initialize_database':
elif action_type == 'initialize_database':
# Initializes the database by creating all required tables and proto bundles.
logging.info("Action: initialize_database")
enable_embeddings = request_json.get('enableEmbeddings',
FLAGS.enable_embeddings)
spanner.initialize_database(enable_embeddings=enable_embeddings)
return ('OK', 200)
elif actionType == 'embedding_ingestion':
elif action_type == 'embedding_ingestion':
logging.info("Action: embedding_ingestion")
enable_embeddings = request_json.get('enableEmbeddings',
FLAGS.enable_embeddings)
Expand All @@ -238,7 +238,7 @@ def ingestion_helper(request):
except Exception as e:
logging.error(f"Embedding ingestion failed: {e}")
return (f"Error: {e}", 500)
elif actionType == 'run_aggregation':
elif action_type == 'run_aggregation':
# Runs aggregation logic for the specified imports.
# Input:
# importList: list of imports to aggregate
Expand All @@ -253,4 +253,4 @@ def ingestion_helper(request):
return (f"Aggregation failed: {str(e)}", 500)

else:
return (f'Unknown actionType: {actionType}', 400)
return (f'Unknown actionType: {action_type}', 400)
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
"source_files": [
"treasury_constant_maturity_rates.csv"
],
"cron_schedule": "15 3 * * *"
"cron_schedule": "15 3 * * *",
"config_override": {
"enable_skip_status": false
Comment thread
vish-cs marked this conversation as resolved.
}
},
{
"import_name": "USFed_ConstantMaturityRates",
Expand Down
Loading