Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
- Fix missing f-string prefix in `JobRunsApi.submit` debug log ([#1471](https://github.com/databricks/dbt-databricks/pull/1471))
- Fix capability-branching macros falling through to their legacy path at parse/compile time on SQL warehouses. The parse-time stub of `has_dbr_capability` now returns `True` on warehouse profiles for capabilities flagged `sql_warehouse_supported`, so macros select the modern branch during compilation instead of the legacy fallback. ([#1449](https://github.com/databricks/dbt-databricks/pull/1449) closes [#1331](https://github.com/databricks/dbt-databricks/issues/1331))
- Fix snapshots not applying `databricks_tags` on columns ([#1442](https://github.com/databricks/dbt-databricks/pull/1442) closes [#1441](https://github.com/databricks/dbt-databricks/issues/1441))
- Gate column-level constraints on `contract.enforced` to match the existing model-level gate, ensuring column-level NOT NULL / PK / FK / CHECK constraints are only applied when `contract.enforced: true` under `use_materialization_v2: true` ([#1470](https://github.com/databricks/dbt-databricks/pull/1470) closes [#1381](https://github.com/databricks/dbt-databricks/issues/1381))

### Under the Hood

- Defer SDK `Config` construction to connection-open time so offline paths (`dbt parse`/`list`/`compile`) don't trigger the host-metadata probe introduced in `databricks-sdk>=0.103`; as a side effect, auth errors now surface at first connection rather than during profile parsing. ([#1474](https://github.com/databricks/dbt-databricks/pull/1474))
- Bump ceilings on `databricks-sdk` (now `<0.105.0`) and `databricks-sql-connector[pyarrow]` (now `<4.3.0`) to admit newer releases; floors unchanged. ([#1474](https://github.com/databricks/dbt-databricks/pull/1474))
- **BREAKING:** users who relied on column-level constraints (NOT NULL, primary key, foreign key, check) being applied under `use_materialization_v2: true` without `contract.enforced: true` must now set `contract.enforced: true` explicitly on the model.

## dbt-databricks 1.12.0 (May 18, 2026)

Expand Down
11 changes: 8 additions & 3 deletions dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -996,12 +996,17 @@ def parse_columns_and_constraints(
existing_columns: list[DatabricksColumn],
model_columns: dict[str, dict[str, Any]],
model_constraints: list[dict[str, Any]],
contract_enforced: bool = False,
) -> tuple[list[DatabricksColumn], list[constraints.TypedConstraint]]:
"""Returns a list of columns that have been updated with features for table create."""
enriched_columns = []
not_null_set, parsed_constraints = constraints.parse_constraints(
list(model_columns.values()), model_constraints
)
if contract_enforced:
not_null_set, parsed_constraints = constraints.parse_constraints(
list(model_columns.values()), model_constraints
)
else:
not_null_set = set()
parsed_constraints = []

# Create a case-insensitive lookup for model column names
model_columns_lower = {k.lower(): k for k in model_columns.keys()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@
{%- set columns = adapter.get_columns_in_relation(temp_relation) -%}
{%- set model_columns = model.get('columns', {}) -%}
{%- set contract_config = config.get('contract') -%}
{%- if contract_config and contract_config.enforced -%}
{%- set contract_enforced = contract_config and contract_config.enforced -%}
{%- if contract_enforced -%}
{%- do exceptions.warn(
"contract.enforced=true on materialized_view '" ~ model.name ~ "': not supported by dbt (https://docs.getdbt.com/docs/mesh/govern/model-contracts). dbt-databricks provides best-effort support that may change without notice."
) -%}
{%- set model_constraints = model.get('constraints', []) -%}
{%- else -%}
{%- set model_constraints = [] -%}
{%- endif -%}
{%- set columns_and_constraints = adapter.parse_columns_and_constraints(columns, model_columns, model_constraints) -%}
{%- set columns_and_constraints = adapter.parse_columns_and_constraints(columns, model_columns, model_constraints, contract_enforced) -%}
{%- set target_relation = relation.enrich(columns_and_constraints[1]) -%}

create or replace materialized view {{ target_relation.render() }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@

{%- set columns = adapter.get_columns_in_relation(temp_relation) -%}
{%- set model_columns = model.get('columns', {}) -%}
{%- set columns_and_constraints = adapter.parse_columns_and_constraints(columns, model_columns, []) -%}
{%- set contract_config = config.get('contract') -%}
{%- set contract_enforced = contract_config and contract_config.enforced -%}
{%- set columns_and_constraints = adapter.parse_columns_and_constraints(columns, model_columns, [], contract_enforced) -%}

{#-- We don't enrich the relation with model constraints because they are not supported for streaming tables --#}
CREATE STREAMING TABLE {{ relation.render() }}
Expand Down
5 changes: 3 additions & 2 deletions dbt/include/databricks/macros/relations/table/create.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
{% set model_columns = model.get('columns', []) %}
{% set existing_columns = adapter.get_columns_in_relation(intermediate_relation) %}
{% set contract_config = config.get('contract') %}
{% if contract_config and contract_config.enforced %}
{% set contract_enforced = contract_config and contract_config.enforced %}
{% if contract_enforced %}
{% set model_constraints = model.get('constraints', []) %}
{% else %}
{% set model_constraints = [] %}
{% endif %}
{% set columns_and_constraints = adapter.parse_columns_and_constraints(existing_columns, model_columns, model_constraints) %}
{% set columns_and_constraints = adapter.parse_columns_and_constraints(existing_columns, model_columns, model_constraints, contract_enforced) %}
{% set target_relation = relation.enrich(columns_and_constraints[1]) %}

{% call statement('main') %}
Expand Down
96 changes: 96 additions & 0 deletions tests/functional/adapter/constraints/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,99 @@
'blue' as color,
'2019-01-01' as date_day
"""


column_constraint_gate_parent_sql = "select cast(1 as int) as id"

_column_constraint_gate_parent_model_yml = """\
- name: parent_table
config:
materialized: table
contract:
enforced: true
columns:
- name: id
data_type: int
constraints:
- type: not_null
- type: primary_key
name: pk_parent_table
"""

column_constraint_gate_child_sql = """
select
cast(x'00' as binary) as hashkey,
cast('2026-01-01' as timestamp) as load_timestamp,
cast('seed' as string) as record_source,
cast(1 as int) as id
"""

column_constraint_gate_child_schema_yml = f"""
version: 2
models:
{_column_constraint_gate_parent_model_yml} - name: child_table
config:
materialized: table
constraints:
- type: primary_key
name: pk_child_table
columns: ["hashkey", "load_timestamp"]
warn_unsupported: false
columns:
- name: hashkey
constraints:
- type: not_null
- name: load_timestamp
constraints:
- type: not_null
- name: record_source
- name: id
constraints:
- type: foreign_key
name: fk_child_table_id
to: ref('parent_table')
to_columns: [id]
"""

column_constraint_gate_child_with_contract_sql = """
{{ config(materialized='incremental') }}
select
cast(x'00' as binary) as hashkey,
cast('2026-01-01' as timestamp) as load_timestamp,
cast('seed' as string) as record_source,
cast(1 as int) as id
"""

column_constraint_gate_child_with_contract_schema_yml = f"""
version: 2
models:
{_column_constraint_gate_parent_model_yml} - name: child_with_contract
config:
materialized: incremental
on_schema_change: append_new_columns
contract:
enforced: true
constraints:
- type: primary_key
name: pk_child_with_contract
columns: ["hashkey", "load_timestamp"]
warn_unsupported: false
columns:
- name: hashkey
data_type: binary
constraints:
- type: not_null
- name: load_timestamp
data_type: timestamp
constraints:
- type: not_null
- name: record_source
data_type: string
- name: id
data_type: int
constraints:
- type: foreign_key
name: fk_child_with_contract_id
to: ref('parent_table')
to_columns: [id]
"""
130 changes: 130 additions & 0 deletions tests/functional/adapter/constraints/test_column_constraint_gate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import pytest
from dbt.tests import util

from tests.functional.adapter.constraints import fixtures
from tests.functional.adapter.fixtures import MaterializationV2Mixin


def _constraint_rows(project, table_name, constraint_type):
db = project.database.lower()
sch = project.test_schema.lower()
sql = f"""
SELECT constraint_name
FROM `{db}`.information_schema.table_constraints
WHERE table_catalog = '{db}'
AND table_schema = '{sch}'
AND table_name = '{table_name.lower()}'
AND constraint_type = '{constraint_type}'
"""
return project.run_sql(sql, fetch="all")


def _pk_columns(project, table_name):
db = project.database.lower()
sch = project.test_schema.lower()
sql = f"""
SELECT kcu.column_name
FROM `{db}`.information_schema.key_column_usage kcu
WHERE kcu.table_catalog = '{db}'
AND kcu.table_schema = '{sch}'
AND kcu.table_name = '{table_name.lower()}'
AND kcu.constraint_name IN (
SELECT constraint_name
FROM `{db}`.information_schema.table_constraints
WHERE table_catalog = '{db}'
AND table_schema = '{sch}'
AND table_name = '{table_name.lower()}'
AND constraint_type = 'PRIMARY KEY'
)
ORDER BY kcu.ordinal_position
"""
return [row[0] for row in project.run_sql(sql, fetch="all")]


def _not_null_columns(project, table_name):
db = project.database.lower()
sch = project.test_schema.lower()
sql = f"""
SELECT column_name
FROM `{db}`.information_schema.columns
WHERE table_catalog = '{db}'
AND table_schema = '{sch}'
AND table_name = '{table_name.lower()}'
AND is_nullable = 'NO'
"""
return [row[0] for row in project.run_sql(sql, fetch="all")]


@pytest.mark.skip_profile("databricks_cluster")
class TestNoConstraintsWithoutContractEnforcement(MaterializationV2Mixin):
@pytest.fixture(scope="class")
def models(self):
return {
"parent_table.sql": fixtures.column_constraint_gate_parent_sql,
"child_table.sql": fixtures.column_constraint_gate_child_sql,
"schema.yml": fixtures.column_constraint_gate_child_schema_yml,
}

def test_neither_column_nor_model_constraints_are_applied(self, project):
util.run_dbt(["run"])

pk_rows = _constraint_rows(project, "child_table", "PRIMARY KEY")
assert len(pk_rows) == 0, (
f"Expected no PRIMARY KEY on child_table without contract.enforced, found {pk_rows}"
)

fk_rows = _constraint_rows(project, "child_table", "FOREIGN KEY")
assert len(fk_rows) == 0, (
f"Expected no FOREIGN KEY on child_table without contract.enforced "
f"(column-level FK must be gated), found {fk_rows}"
)

not_null_cols = _not_null_columns(project, "child_table")
assert not_null_cols == [], (
f"Expected no NOT NULL columns on child_table without contract.enforced "
f"(column-level not_null must be gated), found {not_null_cols}"
)


@pytest.mark.skip_profile("databricks_cluster")
class TestConstraintsApplyWithContractEnforced(MaterializationV2Mixin):
@pytest.fixture(scope="class")
def models(self):
return {
"parent_table.sql": fixtures.column_constraint_gate_parent_sql,
"child_with_contract.sql": fixtures.column_constraint_gate_child_with_contract_sql,
"schema.yml": fixtures.column_constraint_gate_child_with_contract_schema_yml,
}

def test_constraints_apply_and_survive_rerun(self, project):
util.run_dbt(["run"])

pk_rows = _constraint_rows(project, "child_with_contract", "PRIMARY KEY")
assert len(pk_rows) == 1, (
f"Expected one PRIMARY KEY on child_with_contract after first run, found {pk_rows}"
)

pk_cols = _pk_columns(project, "child_with_contract")
assert pk_cols == ["hashkey", "load_timestamp"], (
f"Expected PK columns ['hashkey', 'load_timestamp'], got {pk_cols}"
)

fk_rows = _constraint_rows(project, "child_with_contract", "FOREIGN KEY")
assert len(fk_rows) == 1, (
f"Expected one FOREIGN KEY on child_with_contract after first run, found {fk_rows}"
)

util.run_dbt(["run", "--select", "child_with_contract"])

pk_rows_after = _constraint_rows(project, "child_with_contract", "PRIMARY KEY")
assert len(pk_rows_after) == 1, (
f"Expected PRIMARY KEY to survive the second run, found {pk_rows_after}"
)
pk_cols_after = _pk_columns(project, "child_with_contract")
assert pk_cols_after == ["hashkey", "load_timestamp"], (
f"Expected PK columns preserved after re-run, got {pk_cols_after}"
)
fk_rows_after = _constraint_rows(project, "child_with_contract", "FOREIGN KEY")
assert len(fk_rows_after) == 1, (
f"Expected FOREIGN KEY to survive the second run, found {fk_rows_after}"
)
4 changes: 4 additions & 0 deletions tests/functional/adapter/streaming_tables/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,15 @@ def query_relation_type(project, relation: BaseRelation) -> Optional[str]:
- name: my_streaming_table
columns:
- name: id
data_type: bigint
description: "The unique identifier for each record"
constraints:
- type: not_null
- name: value
data_type: bigint
config:
contract:
enforced: true
persist_docs:
relation: true
columns: true
Expand Down
Loading
Loading