diff --git a/acceptance/bundle/validate/for_each_task_max_retries/databricks.yml b/acceptance/bundle/validate/for_each_task_max_retries/databricks.yml new file mode 100644 index 0000000000..dfe07e4484 --- /dev/null +++ b/acceptance/bundle/validate/for_each_task_max_retries/databricks.yml @@ -0,0 +1,16 @@ +bundle: + name: "for_each_task_validation" + +resources: + jobs: + test_job: + name: "Test Job" + tasks: + - task_key: "parent_task" + for_each_task: + inputs: "[1, 2, 3]" + task: + task_key: "child_task" + notebook_task: + notebook_path: "test.py" + max_retries: 3 diff --git a/acceptance/bundle/validate/for_each_task_max_retries/out.test.toml b/acceptance/bundle/validate/for_each_task_max_retries/out.test.toml new file mode 100644 index 0000000000..d560f1de04 --- /dev/null +++ b/acceptance/bundle/validate/for_each_task_max_retries/out.test.toml @@ -0,0 +1,5 @@ +Local = true +Cloud = false + +[EnvMatrix] + DATABRICKS_BUNDLE_ENGINE = ["terraform", "direct"] diff --git a/acceptance/bundle/validate/for_each_task_max_retries/output.txt b/acceptance/bundle/validate/for_each_task_max_retries/output.txt new file mode 100644 index 0000000000..d2f057650c --- /dev/null +++ b/acceptance/bundle/validate/for_each_task_max_retries/output.txt @@ -0,0 +1,16 @@ +Error: Invalid max_retries configuration for for_each_task + at resources.jobs.test_job.tasks[0] + in databricks.yml:9:11 + +Task "parent_task" has max_retries defined at the parent level, but it uses for_each_task. +When using for_each_task, max_retries must be defined on the nested task (for_each_task.task.max_retries), not on the parent task. + +Name: for_each_task_validation +Target: default +Workspace: + User: [USERNAME] + Path: /Workspace/Users/[USERNAME]/.bundle/for_each_task_validation/default + +Found 1 error + +Exit code: 1 diff --git a/acceptance/bundle/validate/for_each_task_max_retries/script b/acceptance/bundle/validate/for_each_task_max_retries/script new file mode 100644 index 0000000000..d286e54863 --- /dev/null +++ b/acceptance/bundle/validate/for_each_task_max_retries/script @@ -0,0 +1,2 @@ +#!/bin/bash +errcode $CLI bundle validate diff --git a/acceptance/bundle/validate/for_each_task_max_retries/test.py b/acceptance/bundle/validate/for_each_task_max_retries/test.py new file mode 100644 index 0000000000..4d1620a14e --- /dev/null +++ b/acceptance/bundle/validate/for_each_task_max_retries/test.py @@ -0,0 +1,2 @@ +# Databricks notebook source +print("test") diff --git a/acceptance/bundle/validate/for_each_task_retry_fields/databricks.yml b/acceptance/bundle/validate/for_each_task_retry_fields/databricks.yml new file mode 100644 index 0000000000..69ea50df59 --- /dev/null +++ b/acceptance/bundle/validate/for_each_task_retry_fields/databricks.yml @@ -0,0 +1,18 @@ +bundle: + name: "for_each_task_validation" + +resources: + jobs: + test_job: + name: "Test Job" + tasks: + - task_key: "parent_task" + for_each_task: + inputs: "[1, 2, 3]" + task: + task_key: "child_task" + notebook_task: + notebook_path: "test.py" + max_retries: 3 + min_retry_interval_millis: 1000 + retry_on_timeout: true diff --git a/acceptance/bundle/validate/for_each_task_retry_fields/out.test.toml b/acceptance/bundle/validate/for_each_task_retry_fields/out.test.toml new file mode 100644 index 0000000000..d560f1de04 --- /dev/null +++ b/acceptance/bundle/validate/for_each_task_retry_fields/out.test.toml @@ -0,0 +1,5 @@ +Local = true +Cloud = false + +[EnvMatrix] + DATABRICKS_BUNDLE_ENGINE = ["terraform", "direct"] diff --git a/acceptance/bundle/validate/for_each_task_retry_fields/output.txt b/acceptance/bundle/validate/for_each_task_retry_fields/output.txt new file mode 100644 index 0000000000..311d855947 --- /dev/null +++ b/acceptance/bundle/validate/for_each_task_retry_fields/output.txt @@ -0,0 +1,30 @@ +Error: Invalid max_retries configuration for for_each_task + at resources.jobs.test_job.tasks[0] + in databricks.yml:9:11 + +Task "parent_task" has max_retries defined at the parent level, but it uses for_each_task. +When using for_each_task, max_retries must be defined on the nested task (for_each_task.task.max_retries), not on the parent task. + +Warning: Invalid min_retry_interval_millis configuration for for_each_task + at resources.jobs.test_job.tasks[0] + in databricks.yml:9:11 + +Task "parent_task" has min_retry_interval_millis defined at the parent level, but it uses for_each_task. +When using for_each_task, min_retry_interval_millis must be defined on the nested task (for_each_task.task.min_retry_interval_millis), not on the parent task. + +Warning: Invalid retry_on_timeout configuration for for_each_task + at resources.jobs.test_job.tasks[0] + in databricks.yml:9:11 + +Task "parent_task" has retry_on_timeout defined at the parent level, but it uses for_each_task. +When using for_each_task, retry_on_timeout must be defined on the nested task (for_each_task.task.retry_on_timeout), not on the parent task. + +Name: for_each_task_validation +Target: default +Workspace: + User: [USERNAME] + Path: /Workspace/Users/[USERNAME]/.bundle/for_each_task_validation/default + +Found 1 error and 2 warnings + +Exit code: 1 diff --git a/acceptance/bundle/validate/for_each_task_retry_fields/script b/acceptance/bundle/validate/for_each_task_retry_fields/script new file mode 100644 index 0000000000..d286e54863 --- /dev/null +++ b/acceptance/bundle/validate/for_each_task_retry_fields/script @@ -0,0 +1,2 @@ +#!/bin/bash +errcode $CLI bundle validate diff --git a/acceptance/bundle/validate/for_each_task_retry_fields/test.py b/acceptance/bundle/validate/for_each_task_retry_fields/test.py new file mode 100644 index 0000000000..4d1620a14e --- /dev/null +++ b/acceptance/bundle/validate/for_each_task_retry_fields/test.py @@ -0,0 +1,2 @@ +# Databricks notebook source +print("test") diff --git a/acceptance/bundle/validate/for_each_task_valid/databricks.yml b/acceptance/bundle/validate/for_each_task_valid/databricks.yml new file mode 100644 index 0000000000..a1dbd9aa56 --- /dev/null +++ b/acceptance/bundle/validate/for_each_task_valid/databricks.yml @@ -0,0 +1,16 @@ +bundle: + name: "for_each_task_validation" + +resources: + jobs: + test_job: + name: "Test Job" + tasks: + - task_key: "parent_task" + for_each_task: + inputs: "[1, 2, 3]" + task: + task_key: "child_task" + max_retries: 3 + notebook_task: + notebook_path: "test.py" diff --git a/acceptance/bundle/validate/for_each_task_valid/out.test.toml b/acceptance/bundle/validate/for_each_task_valid/out.test.toml new file mode 100644 index 0000000000..d560f1de04 --- /dev/null +++ b/acceptance/bundle/validate/for_each_task_valid/out.test.toml @@ -0,0 +1,5 @@ +Local = true +Cloud = false + +[EnvMatrix] + DATABRICKS_BUNDLE_ENGINE = ["terraform", "direct"] diff --git a/acceptance/bundle/validate/for_each_task_valid/output.txt b/acceptance/bundle/validate/for_each_task_valid/output.txt new file mode 100644 index 0000000000..6503425481 --- /dev/null +++ b/acceptance/bundle/validate/for_each_task_valid/output.txt @@ -0,0 +1,7 @@ +Name: for_each_task_validation +Target: default +Workspace: + User: [USERNAME] + Path: /Workspace/Users/[USERNAME]/.bundle/for_each_task_validation/default + +Validation OK! diff --git a/acceptance/bundle/validate/for_each_task_valid/script b/acceptance/bundle/validate/for_each_task_valid/script new file mode 100644 index 0000000000..c57053d9d0 --- /dev/null +++ b/acceptance/bundle/validate/for_each_task_valid/script @@ -0,0 +1,2 @@ +#!/bin/bash +$CLI bundle validate diff --git a/acceptance/bundle/validate/for_each_task_valid/test.py b/acceptance/bundle/validate/for_each_task_valid/test.py new file mode 100644 index 0000000000..4d1620a14e --- /dev/null +++ b/acceptance/bundle/validate/for_each_task_valid/test.py @@ -0,0 +1,2 @@ +# Databricks notebook source +print("test") diff --git a/bundle/config/validate/fast_validate.go b/bundle/config/validate/fast_validate.go index d01eb8c149..d7d2c8eab7 100644 --- a/bundle/config/validate/fast_validate.go +++ b/bundle/config/validate/fast_validate.go @@ -29,6 +29,7 @@ func (f *fastValidate) Apply(ctx context.Context, rb *bundle.Bundle) diag.Diagno // Fast mutators with only in-memory checks JobClusterKeyDefined(), JobTaskClusterSpec(), + ForEachTask(), // Blocking mutators. Deployments will fail if these checks fail. ValidateArtifactPath(), diff --git a/bundle/config/validate/for_each_task.go b/bundle/config/validate/for_each_task.go new file mode 100644 index 0000000000..c1d68be8b5 --- /dev/null +++ b/bundle/config/validate/for_each_task.go @@ -0,0 +1,76 @@ +package validate + +import ( + "context" + "fmt" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/libs/diag" + "github.com/databricks/cli/libs/dyn" + "github.com/databricks/databricks-sdk-go/service/jobs" +) + +// ForEachTask validates constraints for for_each_task configuration +func ForEachTask() bundle.ReadOnlyMutator { + return &forEachTask{} +} + +type forEachTask struct{ bundle.RO } + +func (v *forEachTask) Name() string { + return "validate:for_each_task" +} + +func (v *forEachTask) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { + diags := diag.Diagnostics{} + + jobsPath := dyn.NewPath(dyn.Key("resources"), dyn.Key("jobs")) + + for resourceName, job := range b.Config.Resources.Jobs { + resourcePath := jobsPath.Append(dyn.Key(resourceName)) + + for taskIndex, task := range job.Tasks { + taskPath := resourcePath.Append(dyn.Key("tasks"), dyn.Index(taskIndex)) + + if task.ForEachTask != nil { + diags = diags.Extend(validateForEachTask(b, task, taskPath)) + } + } + } + + return diags +} + +func validateForEachTask(b *bundle.Bundle, task jobs.Task, taskPath dyn.Path) diag.Diagnostics { + diags := diag.Diagnostics{} + + if task.MaxRetries != 0 { + diags = diags.Append(invalidRetryFieldDiag(b, task, taskPath, "max_retries", diag.Error)) + } + + if task.MinRetryIntervalMillis != 0 { + diags = diags.Append(invalidRetryFieldDiag(b, task, taskPath, "min_retry_interval_millis", diag.Warning)) + } + + if task.RetryOnTimeout { + diags = diags.Append(invalidRetryFieldDiag(b, task, taskPath, "retry_on_timeout", diag.Warning)) + } + + return diags +} + +func invalidRetryFieldDiag(b *bundle.Bundle, task jobs.Task, taskPath dyn.Path, fieldName string, severity diag.Severity) diag.Diagnostic { + detail := fmt.Sprintf( + "Task %q has %s defined at the parent level, but it uses for_each_task.\n"+ + "When using for_each_task, %s must be defined on the nested task (for_each_task.task.%s), not on the parent task.", + task.TaskKey, fieldName, fieldName, fieldName, + ) + + return diag.Diagnostic{ + Severity: severity, + Summary: fmt.Sprintf("Invalid %s configuration for for_each_task", fieldName), + Detail: detail, + Locations: b.Config.GetLocations(taskPath.String()), + Paths: []dyn.Path{taskPath}, + } +}