diff --git a/CHANGELOG.md b/CHANGELOG.md index 125eb9806..a929f7ff2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,11 +9,13 @@ - Fix missing f-string prefix in `JobRunsApi.submit` debug log ([#1471](https://github.com/databricks/dbt-databricks/pull/1471)) - Fix capability-branching macros falling through to their legacy path at parse/compile time on SQL warehouses. The parse-time stub of `has_dbr_capability` now returns `True` on warehouse profiles for capabilities flagged `sql_warehouse_supported`, so macros select the modern branch during compilation instead of the legacy fallback. ([#1449](https://github.com/databricks/dbt-databricks/pull/1449) closes [#1331](https://github.com/databricks/dbt-databricks/issues/1331)) - Fix snapshots not applying `databricks_tags` on columns ([#1442](https://github.com/databricks/dbt-databricks/pull/1442) closes [#1441](https://github.com/databricks/dbt-databricks/issues/1441)) +- Gate column-level constraints on `contract.enforced` to match the existing model-level gate, ensuring column-level NOT NULL / PK / FK / CHECK constraints are only applied when `contract.enforced: true` under `use_materialization_v2: true` ([#1470](https://github.com/databricks/dbt-databricks/pull/1470) closes [#1381](https://github.com/databricks/dbt-databricks/issues/1381)) ### Under the Hood - Defer SDK `Config` construction to connection-open time so offline paths (`dbt parse`/`list`/`compile`) don't trigger the host-metadata probe introduced in `databricks-sdk>=0.103`; as a side effect, auth errors now surface at first connection rather than during profile parsing. ([#1474](https://github.com/databricks/dbt-databricks/pull/1474)) - Bump ceilings on `databricks-sdk` (now `<0.105.0`) and `databricks-sql-connector[pyarrow]` (now `<4.3.0`) to admit newer releases; floors unchanged. ([#1474](https://github.com/databricks/dbt-databricks/pull/1474)) +- **BREAKING:** users who relied on column-level constraints (NOT NULL, primary key, foreign key, check) being applied under `use_materialization_v2: true` without `contract.enforced: true` must now set `contract.enforced: true` explicitly on the model. ## dbt-databricks 1.12.0 (May 18, 2026) diff --git a/dbt/adapters/databricks/impl.py b/dbt/adapters/databricks/impl.py index cc9be7a5b..b98e35494 100644 --- a/dbt/adapters/databricks/impl.py +++ b/dbt/adapters/databricks/impl.py @@ -996,12 +996,17 @@ def parse_columns_and_constraints( existing_columns: list[DatabricksColumn], model_columns: dict[str, dict[str, Any]], model_constraints: list[dict[str, Any]], + contract_enforced: bool = False, ) -> tuple[list[DatabricksColumn], list[constraints.TypedConstraint]]: """Returns a list of columns that have been updated with features for table create.""" enriched_columns = [] - not_null_set, parsed_constraints = constraints.parse_constraints( - list(model_columns.values()), model_constraints - ) + if contract_enforced: + not_null_set, parsed_constraints = constraints.parse_constraints( + list(model_columns.values()), model_constraints + ) + else: + not_null_set = set() + parsed_constraints = [] # Create a case-insensitive lookup for model column names model_columns_lower = {k.lower(): k for k in model_columns.keys()} diff --git a/dbt/include/databricks/macros/relations/materialized_view/create.sql b/dbt/include/databricks/macros/relations/materialized_view/create.sql index 78adba652..f1a4995d6 100644 --- a/dbt/include/databricks/macros/relations/materialized_view/create.sql +++ b/dbt/include/databricks/macros/relations/materialized_view/create.sql @@ -23,7 +23,8 @@ {%- set columns = adapter.get_columns_in_relation(temp_relation) -%} {%- set model_columns = model.get('columns', {}) -%} {%- set contract_config = config.get('contract') -%} - {%- if contract_config and contract_config.enforced -%} + {%- set contract_enforced = contract_config and contract_config.enforced -%} + {%- if contract_enforced -%} {%- do exceptions.warn( "contract.enforced=true on materialized_view '" ~ model.name ~ "': not supported by dbt (https://docs.getdbt.com/docs/mesh/govern/model-contracts). dbt-databricks provides best-effort support that may change without notice." ) -%} @@ -31,7 +32,7 @@ {%- else -%} {%- set model_constraints = [] -%} {%- endif -%} - {%- set columns_and_constraints = adapter.parse_columns_and_constraints(columns, model_columns, model_constraints) -%} + {%- set columns_and_constraints = adapter.parse_columns_and_constraints(columns, model_columns, model_constraints, contract_enforced) -%} {%- set target_relation = relation.enrich(columns_and_constraints[1]) -%} create or replace materialized view {{ target_relation.render() }} diff --git a/dbt/include/databricks/macros/relations/streaming_table/create.sql b/dbt/include/databricks/macros/relations/streaming_table/create.sql index ec18565c4..2e95229a7 100644 --- a/dbt/include/databricks/macros/relations/streaming_table/create.sql +++ b/dbt/include/databricks/macros/relations/streaming_table/create.sql @@ -24,7 +24,9 @@ {%- set columns = adapter.get_columns_in_relation(temp_relation) -%} {%- set model_columns = model.get('columns', {}) -%} - {%- set columns_and_constraints = adapter.parse_columns_and_constraints(columns, model_columns, []) -%} + {%- set contract_config = config.get('contract') -%} + {%- set contract_enforced = contract_config and contract_config.enforced -%} + {%- set columns_and_constraints = adapter.parse_columns_and_constraints(columns, model_columns, [], contract_enforced) -%} {#-- We don't enrich the relation with model constraints because they are not supported for streaming tables --#} CREATE STREAMING TABLE {{ relation.render() }} diff --git a/dbt/include/databricks/macros/relations/table/create.sql b/dbt/include/databricks/macros/relations/table/create.sql index aa62a5255..f82f33a93 100644 --- a/dbt/include/databricks/macros/relations/table/create.sql +++ b/dbt/include/databricks/macros/relations/table/create.sql @@ -3,12 +3,13 @@ {% set model_columns = model.get('columns', []) %} {% set existing_columns = adapter.get_columns_in_relation(intermediate_relation) %} {% set contract_config = config.get('contract') %} - {% if contract_config and contract_config.enforced %} + {% set contract_enforced = contract_config and contract_config.enforced %} + {% if contract_enforced %} {% set model_constraints = model.get('constraints', []) %} {% else %} {% set model_constraints = [] %} {% endif %} - {% set columns_and_constraints = adapter.parse_columns_and_constraints(existing_columns, model_columns, model_constraints) %} + {% set columns_and_constraints = adapter.parse_columns_and_constraints(existing_columns, model_columns, model_constraints, contract_enforced) %} {% set target_relation = relation.enrich(columns_and_constraints[1]) %} {% call statement('main') %} diff --git a/tests/functional/adapter/constraints/fixtures.py b/tests/functional/adapter/constraints/fixtures.py index 200e67381..834bb3155 100644 --- a/tests/functional/adapter/constraints/fixtures.py +++ b/tests/functional/adapter/constraints/fixtures.py @@ -222,3 +222,99 @@ 'blue' as color, '2019-01-01' as date_day """ + + +column_constraint_gate_parent_sql = "select cast(1 as int) as id" + +_column_constraint_gate_parent_model_yml = """\ + - name: parent_table + config: + materialized: table + contract: + enforced: true + columns: + - name: id + data_type: int + constraints: + - type: not_null + - type: primary_key + name: pk_parent_table +""" + +column_constraint_gate_child_sql = """ +select + cast(x'00' as binary) as hashkey, + cast('2026-01-01' as timestamp) as load_timestamp, + cast('seed' as string) as record_source, + cast(1 as int) as id +""" + +column_constraint_gate_child_schema_yml = f""" +version: 2 +models: +{_column_constraint_gate_parent_model_yml} - name: child_table + config: + materialized: table + constraints: + - type: primary_key + name: pk_child_table + columns: ["hashkey", "load_timestamp"] + warn_unsupported: false + columns: + - name: hashkey + constraints: + - type: not_null + - name: load_timestamp + constraints: + - type: not_null + - name: record_source + - name: id + constraints: + - type: foreign_key + name: fk_child_table_id + to: ref('parent_table') + to_columns: [id] +""" + +column_constraint_gate_child_with_contract_sql = """ +{{ config(materialized='incremental') }} +select + cast(x'00' as binary) as hashkey, + cast('2026-01-01' as timestamp) as load_timestamp, + cast('seed' as string) as record_source, + cast(1 as int) as id +""" + +column_constraint_gate_child_with_contract_schema_yml = f""" +version: 2 +models: +{_column_constraint_gate_parent_model_yml} - name: child_with_contract + config: + materialized: incremental + on_schema_change: append_new_columns + contract: + enforced: true + constraints: + - type: primary_key + name: pk_child_with_contract + columns: ["hashkey", "load_timestamp"] + warn_unsupported: false + columns: + - name: hashkey + data_type: binary + constraints: + - type: not_null + - name: load_timestamp + data_type: timestamp + constraints: + - type: not_null + - name: record_source + data_type: string + - name: id + data_type: int + constraints: + - type: foreign_key + name: fk_child_with_contract_id + to: ref('parent_table') + to_columns: [id] +""" diff --git a/tests/functional/adapter/constraints/test_column_constraint_gate.py b/tests/functional/adapter/constraints/test_column_constraint_gate.py new file mode 100644 index 000000000..5e52ef196 --- /dev/null +++ b/tests/functional/adapter/constraints/test_column_constraint_gate.py @@ -0,0 +1,130 @@ +import pytest +from dbt.tests import util + +from tests.functional.adapter.constraints import fixtures +from tests.functional.adapter.fixtures import MaterializationV2Mixin + + +def _constraint_rows(project, table_name, constraint_type): + db = project.database.lower() + sch = project.test_schema.lower() + sql = f""" + SELECT constraint_name + FROM `{db}`.information_schema.table_constraints + WHERE table_catalog = '{db}' + AND table_schema = '{sch}' + AND table_name = '{table_name.lower()}' + AND constraint_type = '{constraint_type}' + """ + return project.run_sql(sql, fetch="all") + + +def _pk_columns(project, table_name): + db = project.database.lower() + sch = project.test_schema.lower() + sql = f""" + SELECT kcu.column_name + FROM `{db}`.information_schema.key_column_usage kcu + WHERE kcu.table_catalog = '{db}' + AND kcu.table_schema = '{sch}' + AND kcu.table_name = '{table_name.lower()}' + AND kcu.constraint_name IN ( + SELECT constraint_name + FROM `{db}`.information_schema.table_constraints + WHERE table_catalog = '{db}' + AND table_schema = '{sch}' + AND table_name = '{table_name.lower()}' + AND constraint_type = 'PRIMARY KEY' + ) + ORDER BY kcu.ordinal_position + """ + return [row[0] for row in project.run_sql(sql, fetch="all")] + + +def _not_null_columns(project, table_name): + db = project.database.lower() + sch = project.test_schema.lower() + sql = f""" + SELECT column_name + FROM `{db}`.information_schema.columns + WHERE table_catalog = '{db}' + AND table_schema = '{sch}' + AND table_name = '{table_name.lower()}' + AND is_nullable = 'NO' + """ + return [row[0] for row in project.run_sql(sql, fetch="all")] + + +@pytest.mark.skip_profile("databricks_cluster") +class TestNoConstraintsWithoutContractEnforcement(MaterializationV2Mixin): + @pytest.fixture(scope="class") + def models(self): + return { + "parent_table.sql": fixtures.column_constraint_gate_parent_sql, + "child_table.sql": fixtures.column_constraint_gate_child_sql, + "schema.yml": fixtures.column_constraint_gate_child_schema_yml, + } + + def test_neither_column_nor_model_constraints_are_applied(self, project): + util.run_dbt(["run"]) + + pk_rows = _constraint_rows(project, "child_table", "PRIMARY KEY") + assert len(pk_rows) == 0, ( + f"Expected no PRIMARY KEY on child_table without contract.enforced, found {pk_rows}" + ) + + fk_rows = _constraint_rows(project, "child_table", "FOREIGN KEY") + assert len(fk_rows) == 0, ( + f"Expected no FOREIGN KEY on child_table without contract.enforced " + f"(column-level FK must be gated), found {fk_rows}" + ) + + not_null_cols = _not_null_columns(project, "child_table") + assert not_null_cols == [], ( + f"Expected no NOT NULL columns on child_table without contract.enforced " + f"(column-level not_null must be gated), found {not_null_cols}" + ) + + +@pytest.mark.skip_profile("databricks_cluster") +class TestConstraintsApplyWithContractEnforced(MaterializationV2Mixin): + @pytest.fixture(scope="class") + def models(self): + return { + "parent_table.sql": fixtures.column_constraint_gate_parent_sql, + "child_with_contract.sql": fixtures.column_constraint_gate_child_with_contract_sql, + "schema.yml": fixtures.column_constraint_gate_child_with_contract_schema_yml, + } + + def test_constraints_apply_and_survive_rerun(self, project): + util.run_dbt(["run"]) + + pk_rows = _constraint_rows(project, "child_with_contract", "PRIMARY KEY") + assert len(pk_rows) == 1, ( + f"Expected one PRIMARY KEY on child_with_contract after first run, found {pk_rows}" + ) + + pk_cols = _pk_columns(project, "child_with_contract") + assert pk_cols == ["hashkey", "load_timestamp"], ( + f"Expected PK columns ['hashkey', 'load_timestamp'], got {pk_cols}" + ) + + fk_rows = _constraint_rows(project, "child_with_contract", "FOREIGN KEY") + assert len(fk_rows) == 1, ( + f"Expected one FOREIGN KEY on child_with_contract after first run, found {fk_rows}" + ) + + util.run_dbt(["run", "--select", "child_with_contract"]) + + pk_rows_after = _constraint_rows(project, "child_with_contract", "PRIMARY KEY") + assert len(pk_rows_after) == 1, ( + f"Expected PRIMARY KEY to survive the second run, found {pk_rows_after}" + ) + pk_cols_after = _pk_columns(project, "child_with_contract") + assert pk_cols_after == ["hashkey", "load_timestamp"], ( + f"Expected PK columns preserved after re-run, got {pk_cols_after}" + ) + fk_rows_after = _constraint_rows(project, "child_with_contract", "FOREIGN KEY") + assert len(fk_rows_after) == 1, ( + f"Expected FOREIGN KEY to survive the second run, found {fk_rows_after}" + ) diff --git a/tests/functional/adapter/streaming_tables/fixtures.py b/tests/functional/adapter/streaming_tables/fixtures.py index 9bfb00faf..50c704543 100644 --- a/tests/functional/adapter/streaming_tables/fixtures.py +++ b/tests/functional/adapter/streaming_tables/fixtures.py @@ -60,11 +60,15 @@ def query_relation_type(project, relation: BaseRelation) -> Optional[str]: - name: my_streaming_table columns: - name: id + data_type: bigint description: "The unique identifier for each record" constraints: - type: not_null - name: value + data_type: bigint config: + contract: + enforced: true persist_docs: relation: true columns: true diff --git a/tests/unit/test_constraints.py b/tests/unit/test_constraints.py index e0d9e1ab4..fa9ff14e4 100644 --- a/tests/unit/test_constraints.py +++ b/tests/unit/test_constraints.py @@ -7,6 +7,7 @@ ) from dbt_common.exceptions import DbtValidationError +from dbt.adapters.databricks.column import DatabricksColumn from dbt.adapters.databricks.constraints import ( CheckConstraint, CustomConstraint, @@ -22,6 +23,7 @@ process_constraint, validate_constraint, ) +from dbt.adapters.databricks.impl import DatabricksAdapter class FakeConstraint(TypedConstraint): @@ -319,3 +321,65 @@ def test_parse_constraints__constraints(self): CustomConstraint(type=ConstraintType.custom, expression="1 = 1"), ], ) == parse_constraints(columns, constraints) + + +class TestParseColumnsAndConstraintsGate: + @staticmethod + def _existing_columns(): + return [DatabricksColumn(column="id", dtype="int")] + + @staticmethod + def _model_columns_with_fk(): + return { + "id": { + "name": "id", + "data_type": "int", + "constraints": [ + {"type": "not_null"}, + { + "type": "foreign_key", + "name": "fk_id", + "to": "parent", + "to_columns": ["id"], + }, + ], + } + } + + def test_skips_column_constraints_when_not_enforced(self): + _, parsed = DatabricksAdapter.parse_columns_and_constraints( + self._existing_columns(), + self._model_columns_with_fk(), + [], + contract_enforced=False, + ) + assert parsed == [] + + def test_skips_column_not_null_when_not_enforced(self): + enriched, _ = DatabricksAdapter.parse_columns_and_constraints( + self._existing_columns(), + self._model_columns_with_fk(), + [], + contract_enforced=False, + ) + assert all(not getattr(col, "not_null", False) for col in enriched) + + def test_parses_column_constraints_when_enforced(self): + _, parsed = DatabricksAdapter.parse_columns_and_constraints( + self._existing_columns(), + self._model_columns_with_fk(), + [], + contract_enforced=True, + ) + assert len(parsed) == 1 + assert isinstance(parsed[0], ForeignKeyConstraint) + assert parsed[0].name == "fk_id" + assert parsed[0].columns == ["id"] + + def test_defaults_to_not_enforced(self): + _, parsed = DatabricksAdapter.parse_columns_and_constraints( + self._existing_columns(), + self._model_columns_with_fk(), + [], + ) + assert parsed == []