From 9e65bcee4a89e2ee44916cd81a112dbd98cd3524 Mon Sep 17 00:00:00 2001 From: Vishal Gupta Date: Fri, 1 May 2026 11:07:57 +0000 Subject: [PATCH] Update ingestion helper to use cloud run services --- import-automation/terraform/main.tf | 232 +++++++----------- .../workflow/aggregation-helper/main.py | 74 ------ .../aggregation-helper/requirements.txt | 2 - import-automation/workflow/cloudbuild.yaml | 8 +- .../workflow/import-helper/Dockerfile | 32 +++ .../workflow/import-helper/cloudbuild.yaml | 39 +++ .../ingestion-helper/aggregation_utils.py | 69 ++++++ .../workflow/ingestion-helper/cloudbuild.yaml | 38 ++- .../workflow/ingestion-helper/main.py | 15 ++ .../workflow/ingestion-helper/pyproject.toml | 1 + .../workflow/spanner-ingestion-workflow.yaml | 4 +- import-automation/workflow/spanner_schema.sql | 55 ----- 12 files changed, 256 insertions(+), 313 deletions(-) delete mode 100644 import-automation/workflow/aggregation-helper/main.py delete mode 100644 import-automation/workflow/aggregation-helper/requirements.txt create mode 100644 import-automation/workflow/import-helper/Dockerfile create mode 100644 import-automation/workflow/import-helper/cloudbuild.yaml create mode 100644 import-automation/workflow/ingestion-helper/aggregation_utils.py delete mode 100644 import-automation/workflow/spanner_schema.sql diff --git a/import-automation/terraform/main.tf b/import-automation/terraform/main.tf index 7c6ed1fd1d..131fc4e324 100644 --- a/import-automation/terraform/main.tf +++ b/import-automation/terraform/main.tf @@ -14,7 +14,7 @@ # - Necessary GCP APIs # - Secret Manager for the import-config secret # - GCS Buckets for imports, mounting, and Dataflow templates -# - Spanner Instance and Database with schema +# - Spanner Instance and Database # - Artifact Registry for hosting Docker images (Flex Template & Executor) # - Pub/Sub Topic and Subscription for triggering imports # - Cloud Functions, Workflows, and Ingestion Pipeline @@ -73,6 +73,12 @@ variable "spanner_database_id" { default = "dc-import-db" } +variable "spanner_graph_database_id" { + description = "Spanner Graph Database ID" + type = string + default = "dc-import-db" +} + variable "bq_dataset_id" { description = "BigQuery Dataset ID for aggregation" type = string @@ -172,141 +178,80 @@ resource "google_storage_bucket" "mount_bucket" { # --- Cloud Functions Source Packaging --- -data "archive_file" "ingestion_helper_source" { - type = "zip" - source_dir = "${path.module}/../workflow/ingestion-helper" - output_path = "${path.module}/ingestion_helper.zip" -} - -data "archive_file" "aggregation_helper_source" { - type = "zip" - source_dir = "${path.module}/../workflow/aggregation-helper" - output_path = "${path.module}/aggregation_helper.zip" -} - -data "archive_file" "import_helper_source" { - type = "zip" - source_dir = "${path.module}/../workflow/import-helper" - output_path = "${path.module}/import_helper.zip" -} - -resource "google_storage_bucket_object" "ingestion_helper_zip" { - name = "function-source/ingestion_helper.${data.archive_file.ingestion_helper_source.output_md5}.zip" - bucket = google_storage_bucket.import_bucket.name - source = data.archive_file.ingestion_helper_source.output_path -} - -resource "google_storage_bucket_object" "aggregation_helper_zip" { - name = "function-source/aggregation_helper.${data.archive_file.aggregation_helper_source.output_md5}.zip" - bucket = google_storage_bucket.import_bucket.name - source = data.archive_file.aggregation_helper_source.output_path -} - -resource "google_storage_bucket_object" "import_helper_zip" { - name = "function-source/import_helper.${data.archive_file.import_helper_source.output_md5}.zip" - bucket = google_storage_bucket.import_bucket.name - source = data.archive_file.import_helper_source.output_path -} - # --- Cloud Functions --- -resource "google_cloudfunctions2_function" "ingestion_helper" { - name = "spanner-ingestion-helper" - location = var.region - project = var.project_id - description = "Helper for Spanner ingestion" - - build_config { - runtime = "python312" - entry_point = "ingestion_helper" - source { - storage_source { - bucket = google_storage_bucket.import_bucket.name - object = google_storage_bucket_object.ingestion_helper_zip.name - } - } - } - - service_config { - max_instance_count = 10 - available_memory = "256M" - timeout_seconds = 60 - service_account_email = google_service_account.automation_sa.email - environment_variables = { - PROJECT_ID = var.project_id - SPANNER_PROJECT_ID = var.project_id - SPANNER_INSTANCE_ID = var.spanner_instance_id - SPANNER_DATABASE_ID = var.spanner_database_id - GCS_BUCKET_ID = google_storage_bucket.import_bucket.name - LOCATION = var.region - } - } - - depends_on = [google_project_service.services] -} +resource "google_cloud_run_v2_service" "ingestion_helper" { + name = "spanner-ingestion-helper" + location = var.region + project = var.project_id -resource "google_cloudfunctions2_function" "aggregation_helper" { - name = "import-aggregation-helper" - location = var.region - project = var.project_id - description = "Helper for import aggregation" - - build_config { - runtime = "python312" - entry_point = "aggregation_helper" - source { - storage_source { - bucket = google_storage_bucket.import_bucket.name - object = google_storage_bucket_object.aggregation_helper_zip.name + template { + service_account = google_service_account.automation_sa.email + containers { + image = "${var.region}-docker.pkg.dev/${var.project_id}/${google_artifact_registry_repository.automation_repo.repository_id}/spanner-ingestion-helper:latest" + env { + name = "PROJECT_ID" + value = var.project_id + } + env { + name = "SPANNER_PROJECT_ID" + value = var.project_id + } + env { + name = "SPANNER_INSTANCE_ID" + value = var.spanner_instance_id + } + env { + name = "SPANNER_DATABASE_ID" + value = var.spanner_database_id + } + env { + name = "SPANNER_GRAPH_DATABASE_ID" + value = var.spanner_graph_database_id + } + env { + name = "GCS_BUCKET_ID" + value = google_storage_bucket.import_bucket.name + } + env { + name = "LOCATION" + value = var.region + } + env { + name = "BQ_DATASET_ID" + value = var.bq_dataset_id } - } - } - - service_config { - max_instance_count = 10 - available_memory = "256M" - timeout_seconds = 60 - service_account_email = google_service_account.automation_sa.email - environment_variables = { - PROJECT_ID = var.project_id - SPANNER_PROJECT_ID = var.project_id - SPANNER_INSTANCE_ID = var.spanner_instance_id - SPANNER_DATABASE_ID = var.spanner_database_id - GCS_BUCKET_ID = google_storage_bucket.import_bucket.name - LOCATION = var.region - BQ_DATASET_ID = var.bq_dataset_id } } depends_on = [google_project_service.services] } -resource "google_cloudfunctions2_function" "import_helper" { - name = "import-automation-helper" - location = var.region - project = var.project_id - description = "Helper for import automation" - - build_config { - runtime = "python312" - entry_point = "handle_feed_event" - source { - storage_source { - bucket = google_storage_bucket.import_bucket.name - object = google_storage_bucket_object.import_helper_zip.name - } - } - } +resource "google_cloud_run_v2_service" "import_helper" { + name = "import-automation-helper" + location = var.region + project = var.project_id - service_config { - max_instance_count = 10 - available_memory = "256M" - timeout_seconds = 60 - service_account_email = google_service_account.automation_sa.email - environment_variables = { - PROJECT_ID = var.project_id - LOCATION = var.region - GCS_BUCKET_ID = google_storage_bucket.import_bucket.name + template { + service_account = google_service_account.automation_sa.email + containers { + image = "${var.region}-docker.pkg.dev/${var.project_id}/${google_artifact_registry_repository.automation_repo.repository_id}/import-automation-helper:latest" + env { + name = "PROJECT_ID" + value = var.project_id + } + env { + name = "LOCATION" + value = var.region + } + env { + name = "GCS_BUCKET_ID" + value = google_storage_bucket.import_bucket.name + } + env { + name = "INGESTION_HELPER_URL" + value = google_cloud_run_v2_service.ingestion_helper.uri + } } } @@ -324,9 +269,10 @@ resource "google_workflows_workflow" "import_automation_workflow" { source_contents = file("${path.module}/../workflow/import-automation-workflow.yaml") user_env_vars = { - LOCATION = var.region - GCS_BUCKET_ID = google_storage_bucket.import_bucket.name - GCS_MOUNT_BUCKET = google_storage_bucket.mount_bucket.name + LOCATION = var.region + GCS_BUCKET_ID = google_storage_bucket.import_bucket.name + GCS_MOUNT_BUCKET = google_storage_bucket.mount_bucket.name + INGESTION_HELPER_URL = google_cloud_run_v2_service.ingestion_helper.uri } depends_on = [google_project_service.services] @@ -341,11 +287,12 @@ resource "google_workflows_workflow" "spanner_ingestion_workflow" { source_contents = file("${path.module}/../workflow/spanner-ingestion-workflow.yaml") user_env_vars = { - LOCATION = var.region - PROJECT_ID = var.project_id - SPANNER_PROJECT_ID = var.project_id - SPANNER_INSTANCE_ID = var.spanner_instance_id - SPANNER_DATABASE_ID = var.spanner_database_id + LOCATION = var.region + PROJECT_ID = var.project_id + SPANNER_PROJECT_ID = var.project_id + SPANNER_INSTANCE_ID = var.spanner_instance_id + SPANNER_DATABASE_ID = var.spanner_database_id + INGESTION_HELPER_URL = google_cloud_run_v2_service.ingestion_helper.uri } depends_on = [google_project_service.services] @@ -367,25 +314,9 @@ resource "google_spanner_database" "import_db" { instance = google_spanner_instance.import_instance.name name = var.spanner_database_id project = var.project_id - ddl = [for s in split(";", file("${path.module}/../workflow/spanner_schema.sql")) : trimspace(s) if trimspace(s) != ""] - deletion_protection = false } -# Initialize IngestionLock (DML) -resource "null_resource" "init_spanner_lock" { - provisioner "local-exec" { - command = < --instance= --project --ddl-file=./spanner_schema.sql - -CREATE TABLE ImportStatus ( - ImportName STRING(MAX) NOT NULL, - LatestVersion STRING(MAX), - GraphPath STRING(MAX), - State STRING(1024) NOT NULL, - JobId STRING(1024), - WorkflowId STRING(1024), - ExecutionTime INT64, - DataVolume INT64, - DataImportTimestamp TIMESTAMP OPTIONS ( allow_commit_timestamp = TRUE ), - StatusUpdateTimestamp TIMESTAMP OPTIONS ( allow_commit_timestamp = TRUE ), - NextRefreshTimestamp TIMESTAMP, -) PRIMARY KEY(ImportName); - -CREATE TABLE IngestionHistory ( - CompletionTimestamp TIMESTAMP NOT NULL OPTIONS ( allow_commit_timestamp = TRUE ), - IngestionFailure Bool NOT NULL, - WorkflowExecutionID STRING(1024) NOT NULL, - DataflowJobID STRING(1024), - IngestedImports ARRAY, - ExecutionTime INT64, - NodeCount INT64, - EdgeCount INT64, - ObservationCount INT64, -) PRIMARY KEY(CompletionTimestamp DESC); - -CREATE TABLE ImportVersionHistory ( - ImportName STRING(MAX) NOT NULL, - Version STRING(MAX) NOT NULL, - UpdateTimestamp TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp=true), - Comment STRING(MAX), -) PRIMARY KEY (ImportName, UpdateTimestamp DESC); - -CREATE TABLE IngestionLock ( - LockID STRING(1024) NOT NULL, - LockOwner STRING(1024), - AcquiredTimestamp TIMESTAMP OPTIONS ( allow_commit_timestamp = TRUE ), -) PRIMARY KEY(LockID);