Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

### Features

- Add catalogs.yml v2 support (requires `use_catalogs_v2: true` in dbt-core)
- Add `invocation_id` to the default query comment ([#1377](https://github.com/databricks/dbt-databricks/issues/1377))

### Fixes
Expand Down
8 changes: 8 additions & 0 deletions dbt/adapters/databricks/catalogs/_hive_metastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

from dbt.adapters.catalogs import CatalogIntegration, CatalogIntegrationConfig
from dbt.adapters.contracts.relation import RelationConfig
from dbt_common.exceptions import DbtValidationError

from dbt.adapters.databricks import constants, parse_model
from dbt.adapters.databricks.catalogs._relation import DatabricksCatalogRelation

_VALID_HIVE_FILE_FORMATS = {"delta", "parquet", "hudi"}


class HiveMetastoreCatalogIntegration(CatalogIntegration):
catalog_type = constants.HIVE_METASTORE_CATALOG_TYPE
Expand All @@ -14,6 +17,11 @@ class HiveMetastoreCatalogIntegration(CatalogIntegration):
def __init__(self, config: CatalogIntegrationConfig) -> None:
super().__init__(config)
self.file_format: Optional[str] = config.file_format
if config.file_format and config.file_format.lower() not in _VALID_HIVE_FILE_FORMATS:
raise DbtValidationError(
f"Catalog '{config.name}' hive_metastore/databricks file_format "
f"must be one of {sorted(_VALID_HIVE_FILE_FORMATS)}, got '{config.file_format}'"
)

@property
def location_root(self) -> Optional[str]:
Expand Down
18 changes: 17 additions & 1 deletion dbt/adapters/databricks/catalogs/_unity.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from dbt.adapters.catalogs import CatalogIntegration, CatalogIntegrationConfig
from dbt.adapters.contracts.relation import RelationConfig
from dbt_common.exceptions import DbtValidationError

from dbt.adapters.databricks import constants, parse_model
from dbt.adapters.databricks.catalogs._relation import DatabricksCatalogRelation
Expand All @@ -13,9 +14,24 @@ class UnityCatalogIntegration(CatalogIntegration):

def __init__(self, config: CatalogIntegrationConfig) -> None:
super().__init__(config)
if location_root := config.adapter_properties.get("location_root"):
location_root = config.adapter_properties.get("location_root")
if location_root is not None:
if not str(location_root).strip():
raise DbtValidationError(
f"Catalog '{config.name}' unity/databricks location_root cannot be blank"
)
self.external_volume: Optional[str] = location_root
self.file_format: Optional[str] = config.file_format
use_uniform = config.adapter_properties.get("use_uniform", False)
ff = (config.file_format or "").lower()
if use_uniform and ff != "delta":
raise DbtValidationError(
f"Catalog '{config.name}' unity/databricks use_uniform: true requires file_format: delta"
)
if not use_uniform and ff and ff != "parquet":
raise DbtValidationError(
f"Catalog '{config.name}' unity/databricks use_uniform: false (or unset) requires file_format: parquet"
)

@property
def location_root(self) -> Optional[str]:
Expand Down
8 changes: 8 additions & 0 deletions dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,13 +244,18 @@ class DatabricksAdapter(SparkAdapter):
{
Capability.TableLastModifiedMetadata: CapabilitySupport(support=Support.Full),
Capability.SchemaMetadataByRelations: CapabilitySupport(support=Support.Full),
Capability.CatalogsV2: CapabilitySupport(support=Support.Full),
}
)

CATALOG_INTEGRATIONS = [
HiveMetastoreCatalogIntegration,
UnityCatalogIntegration,
]
_V2_TO_V1_TYPE: ClassVar[dict[str, str]] = {
"unity": constants.UNITY_CATALOG_TYPE,
"hive_metastore": constants.HIVE_METASTORE_CATALOG_TYPE,
}
CONSTRAINT_SUPPORT = constraints.CONSTRAINT_SUPPORT

get_column_behavior: GetColumnsBehavior
Expand Down Expand Up @@ -289,6 +294,9 @@ def _has_dbr_capability_parse(self, capability_name: str) -> bool:
return False
return DBRCapabilities(is_sql_warehouse=True).has_capability(capability)

def _v2_to_v1_type(self, catalog_type: str) -> str:
return self._V2_TO_V1_TYPE.get(catalog_type, catalog_type)

@property
def _behavior_flags(self) -> list[BehaviorFlag]:
return [
Expand Down
120 changes: 120 additions & 0 deletions tests/unit/test_catalogs_v2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
from dataclasses import dataclass, field
from typing import Any, Dict, Optional

import pytest

from dbt.adapters.capability import Capability, Support
from dbt.adapters.databricks.catalogs import (
HiveMetastoreCatalogIntegration,
UnityCatalogIntegration,
)
from dbt.adapters.databricks.impl import DatabricksAdapter
from dbt_common.exceptions import DbtValidationError


@dataclass
class _Config:
"""Minimal CatalogIntegrationConfig stub for testing __init__ validation."""

name: str = "test_cat"
catalog_type: str = "unity"
catalog_name: Optional[str] = None
table_format: Optional[str] = "iceberg"
external_volume: Optional[str] = None
file_format: Optional[str] = None
adapter_properties: Dict[str, Any] = field(default_factory=dict)


# ===== Adapter-level =====


def test_catalogs_v2_capability_declared():
cap = DatabricksAdapter._capabilities[Capability.CatalogsV2]
assert cap.support == Support.Full


def test_v2_to_v1_type_unity():
adapter = object.__new__(DatabricksAdapter)
assert adapter._v2_to_v1_type("unity") == "unity"


def test_v2_to_v1_type_hive_metastore():
adapter = object.__new__(DatabricksAdapter)
assert adapter._v2_to_v1_type("hive_metastore") == "hive_metastore"


def test_v2_to_v1_type_unknown_passthrough():
adapter = object.__new__(DatabricksAdapter)
assert adapter._v2_to_v1_type("custom_type") == "custom_type"


# ===== UnityCatalogIntegration =====


def test_unity_parquet_without_uniform():
cfg = _Config(file_format="parquet")
integration = UnityCatalogIntegration(cfg)
assert integration.file_format == "parquet"


def test_unity_delta_with_uniform():
cfg = _Config(file_format="delta", adapter_properties={"use_uniform": True})
integration = UnityCatalogIntegration(cfg)
assert integration.file_format == "delta"


def test_unity_with_location_root():
cfg = _Config(file_format="parquet", adapter_properties={"location_root": "/mnt/data"})
integration = UnityCatalogIntegration(cfg)
assert integration.external_volume == "/mnt/data"


def test_unity_delta_without_uniform_raises():
cfg = _Config(file_format="delta")
with pytest.raises(DbtValidationError, match="use_uniform: false.*requires file_format: parquet"):
UnityCatalogIntegration(cfg)


def test_unity_parquet_with_uniform_raises():
cfg = _Config(file_format="parquet", adapter_properties={"use_uniform": True})
with pytest.raises(DbtValidationError, match="use_uniform: true.*requires file_format: delta"):
UnityCatalogIntegration(cfg)


def test_unity_blank_location_root_raises():
cfg = _Config(file_format="parquet", adapter_properties={"location_root": " "})
with pytest.raises(DbtValidationError, match="location_root cannot be blank"):
UnityCatalogIntegration(cfg)


def test_unity_empty_location_root_raises():
cfg = _Config(file_format="parquet", adapter_properties={"location_root": ""})
with pytest.raises(DbtValidationError, match="location_root cannot be blank"):
UnityCatalogIntegration(cfg)


# ===== HiveMetastoreCatalogIntegration =====


def test_hive_delta_valid():
cfg = _Config(catalog_type="hive_metastore", file_format="delta")
integration = HiveMetastoreCatalogIntegration(cfg)
assert integration.file_format == "delta"


def test_hive_parquet_valid():
cfg = _Config(catalog_type="hive_metastore", file_format="parquet")
integration = HiveMetastoreCatalogIntegration(cfg)
assert integration.file_format == "parquet"


def test_hive_hudi_valid():
cfg = _Config(catalog_type="hive_metastore", file_format="hudi")
integration = HiveMetastoreCatalogIntegration(cfg)
assert integration.file_format == "hudi"


def test_hive_invalid_file_format_raises():
cfg = _Config(catalog_type="hive_metastore", file_format="avro")
with pytest.raises(DbtValidationError, match="file_format"):
HiveMetastoreCatalogIntegration(cfg)