From 4a77a10dcd45c4b2ada0f2dfb1be0062db2749da Mon Sep 17 00:00:00 2001 From: Shubham Dhal Date: Wed, 20 May 2026 12:08:37 +0530 Subject: [PATCH 1/3] fix: gate column-level constraints on contract.enforced (#1381) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `adapter.parse_columns_and_constraints` gains a `contract_enforced` flag so the column-level subpath honors the same gate that already applies to the model-level subpath and the diff path. The three V2 create macros (`table`, `materialized_view`, `streaming_table`) compute `contract.enforced` once and pass it in. Pre-fix, under `use_materialization_v2: true` without `contract.enforced: true`, column-level NOT NULL / FK leaked through and were applied to the table, while model-level constraints (e.g. multi-column PK) were correctly gated out. On incremental re-runs the diff path then "cleaned up" the leaked constraints, emitting `DROP CONSTRAINT IF EXISTS` with no follow-up `ADD CONSTRAINT` — silently stripping constraints on every successive run. Closes #1381. Completes the contract-enforcement work from #1342/#1343. --- CHANGELOG.md | 8 ++ dbt/adapters/databricks/impl.py | 11 +- .../relations/materialized_view/create.sql | 5 +- .../relations/streaming_table/create.sql | 4 +- .../macros/relations/table/create.sql | 5 +- .../adapter/constraints/fixtures.py | 96 +++++++++++++ .../test_column_constraint_gate.py | 130 ++++++++++++++++++ tests/unit/test_constraints.py | 64 +++++++++ 8 files changed, 315 insertions(+), 8 deletions(-) create mode 100644 tests/functional/adapter/constraints/test_column_constraint_gate.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 99848374f..b190b95c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,14 @@ - Expose `job_id`, `job_run_id`, and `task_run_id` from the Databricks Jobs `dbt_task` runtime in `adapter_response`, enabling correlation between dbt runs and Databricks workflow executions via `run_results.json` ([#1451](https://github.com/databricks/dbt-databricks/pull/1451) closes [#722](https://github.com/databricks/dbt-databricks/issues/722)) +### Fixes + +- Gate column-level constraints on `contract.enforced` to match the existing model-level gate, ensuring column-level NOT NULL / PK / FK / CHECK constraints are only applied when `contract.enforced: true` under `use_materialization_v2: true` ([#TBD](https://github.com/databricks/dbt-databricks/pull/TBD) closes [#1381](https://github.com/databricks/dbt-databricks/issues/1381)) + +### Under the Hood + +- **BREAKING:** users who relied on column-level constraints (NOT NULL, primary key, foreign key, check) being applied under `use_materialization_v2: true` without `contract.enforced: true` must now set `contract.enforced: true` explicitly on the model. + ## dbt-databricks 1.12.0 (May 18, 2026) ### Features diff --git a/dbt/adapters/databricks/impl.py b/dbt/adapters/databricks/impl.py index 5e67867fa..ee263e363 100644 --- a/dbt/adapters/databricks/impl.py +++ b/dbt/adapters/databricks/impl.py @@ -976,12 +976,17 @@ def parse_columns_and_constraints( existing_columns: list[DatabricksColumn], model_columns: dict[str, dict[str, Any]], model_constraints: list[dict[str, Any]], + contract_enforced: bool = False, ) -> tuple[list[DatabricksColumn], list[constraints.TypedConstraint]]: """Returns a list of columns that have been updated with features for table create.""" enriched_columns = [] - not_null_set, parsed_constraints = constraints.parse_constraints( - list(model_columns.values()), model_constraints - ) + if contract_enforced: + not_null_set, parsed_constraints = constraints.parse_constraints( + list(model_columns.values()), model_constraints + ) + else: + not_null_set = set() + parsed_constraints = [] # Create a case-insensitive lookup for model column names model_columns_lower = {k.lower(): k for k in model_columns.keys()} diff --git a/dbt/include/databricks/macros/relations/materialized_view/create.sql b/dbt/include/databricks/macros/relations/materialized_view/create.sql index 78adba652..f1a4995d6 100644 --- a/dbt/include/databricks/macros/relations/materialized_view/create.sql +++ b/dbt/include/databricks/macros/relations/materialized_view/create.sql @@ -23,7 +23,8 @@ {%- set columns = adapter.get_columns_in_relation(temp_relation) -%} {%- set model_columns = model.get('columns', {}) -%} {%- set contract_config = config.get('contract') -%} - {%- if contract_config and contract_config.enforced -%} + {%- set contract_enforced = contract_config and contract_config.enforced -%} + {%- if contract_enforced -%} {%- do exceptions.warn( "contract.enforced=true on materialized_view '" ~ model.name ~ "': not supported by dbt (https://docs.getdbt.com/docs/mesh/govern/model-contracts). dbt-databricks provides best-effort support that may change without notice." ) -%} @@ -31,7 +32,7 @@ {%- else -%} {%- set model_constraints = [] -%} {%- endif -%} - {%- set columns_and_constraints = adapter.parse_columns_and_constraints(columns, model_columns, model_constraints) -%} + {%- set columns_and_constraints = adapter.parse_columns_and_constraints(columns, model_columns, model_constraints, contract_enforced) -%} {%- set target_relation = relation.enrich(columns_and_constraints[1]) -%} create or replace materialized view {{ target_relation.render() }} diff --git a/dbt/include/databricks/macros/relations/streaming_table/create.sql b/dbt/include/databricks/macros/relations/streaming_table/create.sql index ec18565c4..2e95229a7 100644 --- a/dbt/include/databricks/macros/relations/streaming_table/create.sql +++ b/dbt/include/databricks/macros/relations/streaming_table/create.sql @@ -24,7 +24,9 @@ {%- set columns = adapter.get_columns_in_relation(temp_relation) -%} {%- set model_columns = model.get('columns', {}) -%} - {%- set columns_and_constraints = adapter.parse_columns_and_constraints(columns, model_columns, []) -%} + {%- set contract_config = config.get('contract') -%} + {%- set contract_enforced = contract_config and contract_config.enforced -%} + {%- set columns_and_constraints = adapter.parse_columns_and_constraints(columns, model_columns, [], contract_enforced) -%} {#-- We don't enrich the relation with model constraints because they are not supported for streaming tables --#} CREATE STREAMING TABLE {{ relation.render() }} diff --git a/dbt/include/databricks/macros/relations/table/create.sql b/dbt/include/databricks/macros/relations/table/create.sql index aa62a5255..f82f33a93 100644 --- a/dbt/include/databricks/macros/relations/table/create.sql +++ b/dbt/include/databricks/macros/relations/table/create.sql @@ -3,12 +3,13 @@ {% set model_columns = model.get('columns', []) %} {% set existing_columns = adapter.get_columns_in_relation(intermediate_relation) %} {% set contract_config = config.get('contract') %} - {% if contract_config and contract_config.enforced %} + {% set contract_enforced = contract_config and contract_config.enforced %} + {% if contract_enforced %} {% set model_constraints = model.get('constraints', []) %} {% else %} {% set model_constraints = [] %} {% endif %} - {% set columns_and_constraints = adapter.parse_columns_and_constraints(existing_columns, model_columns, model_constraints) %} + {% set columns_and_constraints = adapter.parse_columns_and_constraints(existing_columns, model_columns, model_constraints, contract_enforced) %} {% set target_relation = relation.enrich(columns_and_constraints[1]) %} {% call statement('main') %} diff --git a/tests/functional/adapter/constraints/fixtures.py b/tests/functional/adapter/constraints/fixtures.py index 200e67381..834bb3155 100644 --- a/tests/functional/adapter/constraints/fixtures.py +++ b/tests/functional/adapter/constraints/fixtures.py @@ -222,3 +222,99 @@ 'blue' as color, '2019-01-01' as date_day """ + + +column_constraint_gate_parent_sql = "select cast(1 as int) as id" + +_column_constraint_gate_parent_model_yml = """\ + - name: parent_table + config: + materialized: table + contract: + enforced: true + columns: + - name: id + data_type: int + constraints: + - type: not_null + - type: primary_key + name: pk_parent_table +""" + +column_constraint_gate_child_sql = """ +select + cast(x'00' as binary) as hashkey, + cast('2026-01-01' as timestamp) as load_timestamp, + cast('seed' as string) as record_source, + cast(1 as int) as id +""" + +column_constraint_gate_child_schema_yml = f""" +version: 2 +models: +{_column_constraint_gate_parent_model_yml} - name: child_table + config: + materialized: table + constraints: + - type: primary_key + name: pk_child_table + columns: ["hashkey", "load_timestamp"] + warn_unsupported: false + columns: + - name: hashkey + constraints: + - type: not_null + - name: load_timestamp + constraints: + - type: not_null + - name: record_source + - name: id + constraints: + - type: foreign_key + name: fk_child_table_id + to: ref('parent_table') + to_columns: [id] +""" + +column_constraint_gate_child_with_contract_sql = """ +{{ config(materialized='incremental') }} +select + cast(x'00' as binary) as hashkey, + cast('2026-01-01' as timestamp) as load_timestamp, + cast('seed' as string) as record_source, + cast(1 as int) as id +""" + +column_constraint_gate_child_with_contract_schema_yml = f""" +version: 2 +models: +{_column_constraint_gate_parent_model_yml} - name: child_with_contract + config: + materialized: incremental + on_schema_change: append_new_columns + contract: + enforced: true + constraints: + - type: primary_key + name: pk_child_with_contract + columns: ["hashkey", "load_timestamp"] + warn_unsupported: false + columns: + - name: hashkey + data_type: binary + constraints: + - type: not_null + - name: load_timestamp + data_type: timestamp + constraints: + - type: not_null + - name: record_source + data_type: string + - name: id + data_type: int + constraints: + - type: foreign_key + name: fk_child_with_contract_id + to: ref('parent_table') + to_columns: [id] +""" diff --git a/tests/functional/adapter/constraints/test_column_constraint_gate.py b/tests/functional/adapter/constraints/test_column_constraint_gate.py new file mode 100644 index 000000000..5e52ef196 --- /dev/null +++ b/tests/functional/adapter/constraints/test_column_constraint_gate.py @@ -0,0 +1,130 @@ +import pytest +from dbt.tests import util + +from tests.functional.adapter.constraints import fixtures +from tests.functional.adapter.fixtures import MaterializationV2Mixin + + +def _constraint_rows(project, table_name, constraint_type): + db = project.database.lower() + sch = project.test_schema.lower() + sql = f""" + SELECT constraint_name + FROM `{db}`.information_schema.table_constraints + WHERE table_catalog = '{db}' + AND table_schema = '{sch}' + AND table_name = '{table_name.lower()}' + AND constraint_type = '{constraint_type}' + """ + return project.run_sql(sql, fetch="all") + + +def _pk_columns(project, table_name): + db = project.database.lower() + sch = project.test_schema.lower() + sql = f""" + SELECT kcu.column_name + FROM `{db}`.information_schema.key_column_usage kcu + WHERE kcu.table_catalog = '{db}' + AND kcu.table_schema = '{sch}' + AND kcu.table_name = '{table_name.lower()}' + AND kcu.constraint_name IN ( + SELECT constraint_name + FROM `{db}`.information_schema.table_constraints + WHERE table_catalog = '{db}' + AND table_schema = '{sch}' + AND table_name = '{table_name.lower()}' + AND constraint_type = 'PRIMARY KEY' + ) + ORDER BY kcu.ordinal_position + """ + return [row[0] for row in project.run_sql(sql, fetch="all")] + + +def _not_null_columns(project, table_name): + db = project.database.lower() + sch = project.test_schema.lower() + sql = f""" + SELECT column_name + FROM `{db}`.information_schema.columns + WHERE table_catalog = '{db}' + AND table_schema = '{sch}' + AND table_name = '{table_name.lower()}' + AND is_nullable = 'NO' + """ + return [row[0] for row in project.run_sql(sql, fetch="all")] + + +@pytest.mark.skip_profile("databricks_cluster") +class TestNoConstraintsWithoutContractEnforcement(MaterializationV2Mixin): + @pytest.fixture(scope="class") + def models(self): + return { + "parent_table.sql": fixtures.column_constraint_gate_parent_sql, + "child_table.sql": fixtures.column_constraint_gate_child_sql, + "schema.yml": fixtures.column_constraint_gate_child_schema_yml, + } + + def test_neither_column_nor_model_constraints_are_applied(self, project): + util.run_dbt(["run"]) + + pk_rows = _constraint_rows(project, "child_table", "PRIMARY KEY") + assert len(pk_rows) == 0, ( + f"Expected no PRIMARY KEY on child_table without contract.enforced, found {pk_rows}" + ) + + fk_rows = _constraint_rows(project, "child_table", "FOREIGN KEY") + assert len(fk_rows) == 0, ( + f"Expected no FOREIGN KEY on child_table without contract.enforced " + f"(column-level FK must be gated), found {fk_rows}" + ) + + not_null_cols = _not_null_columns(project, "child_table") + assert not_null_cols == [], ( + f"Expected no NOT NULL columns on child_table without contract.enforced " + f"(column-level not_null must be gated), found {not_null_cols}" + ) + + +@pytest.mark.skip_profile("databricks_cluster") +class TestConstraintsApplyWithContractEnforced(MaterializationV2Mixin): + @pytest.fixture(scope="class") + def models(self): + return { + "parent_table.sql": fixtures.column_constraint_gate_parent_sql, + "child_with_contract.sql": fixtures.column_constraint_gate_child_with_contract_sql, + "schema.yml": fixtures.column_constraint_gate_child_with_contract_schema_yml, + } + + def test_constraints_apply_and_survive_rerun(self, project): + util.run_dbt(["run"]) + + pk_rows = _constraint_rows(project, "child_with_contract", "PRIMARY KEY") + assert len(pk_rows) == 1, ( + f"Expected one PRIMARY KEY on child_with_contract after first run, found {pk_rows}" + ) + + pk_cols = _pk_columns(project, "child_with_contract") + assert pk_cols == ["hashkey", "load_timestamp"], ( + f"Expected PK columns ['hashkey', 'load_timestamp'], got {pk_cols}" + ) + + fk_rows = _constraint_rows(project, "child_with_contract", "FOREIGN KEY") + assert len(fk_rows) == 1, ( + f"Expected one FOREIGN KEY on child_with_contract after first run, found {fk_rows}" + ) + + util.run_dbt(["run", "--select", "child_with_contract"]) + + pk_rows_after = _constraint_rows(project, "child_with_contract", "PRIMARY KEY") + assert len(pk_rows_after) == 1, ( + f"Expected PRIMARY KEY to survive the second run, found {pk_rows_after}" + ) + pk_cols_after = _pk_columns(project, "child_with_contract") + assert pk_cols_after == ["hashkey", "load_timestamp"], ( + f"Expected PK columns preserved after re-run, got {pk_cols_after}" + ) + fk_rows_after = _constraint_rows(project, "child_with_contract", "FOREIGN KEY") + assert len(fk_rows_after) == 1, ( + f"Expected FOREIGN KEY to survive the second run, found {fk_rows_after}" + ) diff --git a/tests/unit/test_constraints.py b/tests/unit/test_constraints.py index e0d9e1ab4..fa9ff14e4 100644 --- a/tests/unit/test_constraints.py +++ b/tests/unit/test_constraints.py @@ -7,6 +7,7 @@ ) from dbt_common.exceptions import DbtValidationError +from dbt.adapters.databricks.column import DatabricksColumn from dbt.adapters.databricks.constraints import ( CheckConstraint, CustomConstraint, @@ -22,6 +23,7 @@ process_constraint, validate_constraint, ) +from dbt.adapters.databricks.impl import DatabricksAdapter class FakeConstraint(TypedConstraint): @@ -319,3 +321,65 @@ def test_parse_constraints__constraints(self): CustomConstraint(type=ConstraintType.custom, expression="1 = 1"), ], ) == parse_constraints(columns, constraints) + + +class TestParseColumnsAndConstraintsGate: + @staticmethod + def _existing_columns(): + return [DatabricksColumn(column="id", dtype="int")] + + @staticmethod + def _model_columns_with_fk(): + return { + "id": { + "name": "id", + "data_type": "int", + "constraints": [ + {"type": "not_null"}, + { + "type": "foreign_key", + "name": "fk_id", + "to": "parent", + "to_columns": ["id"], + }, + ], + } + } + + def test_skips_column_constraints_when_not_enforced(self): + _, parsed = DatabricksAdapter.parse_columns_and_constraints( + self._existing_columns(), + self._model_columns_with_fk(), + [], + contract_enforced=False, + ) + assert parsed == [] + + def test_skips_column_not_null_when_not_enforced(self): + enriched, _ = DatabricksAdapter.parse_columns_and_constraints( + self._existing_columns(), + self._model_columns_with_fk(), + [], + contract_enforced=False, + ) + assert all(not getattr(col, "not_null", False) for col in enriched) + + def test_parses_column_constraints_when_enforced(self): + _, parsed = DatabricksAdapter.parse_columns_and_constraints( + self._existing_columns(), + self._model_columns_with_fk(), + [], + contract_enforced=True, + ) + assert len(parsed) == 1 + assert isinstance(parsed[0], ForeignKeyConstraint) + assert parsed[0].name == "fk_id" + assert parsed[0].columns == ["id"] + + def test_defaults_to_not_enforced(self): + _, parsed = DatabricksAdapter.parse_columns_and_constraints( + self._existing_columns(), + self._model_columns_with_fk(), + [], + ) + assert parsed == [] From 2601dd0d528e0b74d8c87614c0af348778b26003 Mon Sep 17 00:00:00 2001 From: Shubham Dhal Date: Wed, 20 May 2026 12:10:33 +0530 Subject: [PATCH 2/3] chore: backfill PR #1470 into changelog entry --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b190b95c9..68e459791 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,7 @@ ### Fixes -- Gate column-level constraints on `contract.enforced` to match the existing model-level gate, ensuring column-level NOT NULL / PK / FK / CHECK constraints are only applied when `contract.enforced: true` under `use_materialization_v2: true` ([#TBD](https://github.com/databricks/dbt-databricks/pull/TBD) closes [#1381](https://github.com/databricks/dbt-databricks/issues/1381)) +- Gate column-level constraints on `contract.enforced` to match the existing model-level gate, ensuring column-level NOT NULL / PK / FK / CHECK constraints are only applied when `contract.enforced: true` under `use_materialization_v2: true` ([#1470](https://github.com/databricks/dbt-databricks/pull/1470) closes [#1381](https://github.com/databricks/dbt-databricks/issues/1381)) ### Under the Hood From eb47a4a205ad0ce33b3e0eee1098b3e4c81c5ddb Mon Sep 17 00:00:00 2001 From: Shubham Dhal Date: Thu, 21 May 2026 13:36:17 +0530 Subject: [PATCH 3/3] test: opt streaming_table fixture into contract.enforced The not_null on id was being applied silently via the pre-PR leak. With column-level constraints now gated by contract.enforced, the fixture must opt in explicitly to keep test_streaming_table_create asserting that the constraint reaches the database. Mirrors the existing materialized_view_schema fixture. --- tests/functional/adapter/streaming_tables/fixtures.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/functional/adapter/streaming_tables/fixtures.py b/tests/functional/adapter/streaming_tables/fixtures.py index 9bfb00faf..50c704543 100644 --- a/tests/functional/adapter/streaming_tables/fixtures.py +++ b/tests/functional/adapter/streaming_tables/fixtures.py @@ -60,11 +60,15 @@ def query_relation_type(project, relation: BaseRelation) -> Optional[str]: - name: my_streaming_table columns: - name: id + data_type: bigint description: "The unique identifier for each record" constraints: - type: not_null - name: value + data_type: bigint config: + contract: + enforced: true persist_docs: relation: true columns: true