diff --git a/CHANGELOG.md b/CHANGELOG.md index 4836749e1..a1cae2459 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,6 +41,7 @@ ### Features +- Add catalogs.yml v2 support (requires `use_catalogs_v2: true` in dbt-core) - Add `invocation_id` to the default query comment ([#1377](https://github.com/databricks/dbt-databricks/issues/1377)) ### Fixes diff --git a/dbt/adapters/databricks/catalogs/_hive_metastore.py b/dbt/adapters/databricks/catalogs/_hive_metastore.py index d5ee31356..caace7c2d 100644 --- a/dbt/adapters/databricks/catalogs/_hive_metastore.py +++ b/dbt/adapters/databricks/catalogs/_hive_metastore.py @@ -2,10 +2,13 @@ from dbt.adapters.catalogs import CatalogIntegration, CatalogIntegrationConfig from dbt.adapters.contracts.relation import RelationConfig +from dbt_common.exceptions import DbtValidationError from dbt.adapters.databricks import constants, parse_model from dbt.adapters.databricks.catalogs._relation import DatabricksCatalogRelation +_VALID_HIVE_FILE_FORMATS = {"delta", "parquet", "hudi"} + class HiveMetastoreCatalogIntegration(CatalogIntegration): catalog_type = constants.HIVE_METASTORE_CATALOG_TYPE @@ -14,6 +17,11 @@ class HiveMetastoreCatalogIntegration(CatalogIntegration): def __init__(self, config: CatalogIntegrationConfig) -> None: super().__init__(config) self.file_format: Optional[str] = config.file_format + if config.file_format and config.file_format.lower() not in _VALID_HIVE_FILE_FORMATS: + raise DbtValidationError( + f"Catalog '{config.name}' hive_metastore/databricks file_format " + f"must be one of {sorted(_VALID_HIVE_FILE_FORMATS)}, got '{config.file_format}'" + ) @property def location_root(self) -> Optional[str]: diff --git a/dbt/adapters/databricks/catalogs/_unity.py b/dbt/adapters/databricks/catalogs/_unity.py index 9e650bea8..9f55328bf 100644 --- a/dbt/adapters/databricks/catalogs/_unity.py +++ b/dbt/adapters/databricks/catalogs/_unity.py @@ -2,6 +2,7 @@ from dbt.adapters.catalogs import CatalogIntegration, CatalogIntegrationConfig from dbt.adapters.contracts.relation import RelationConfig +from dbt_common.exceptions import DbtValidationError from dbt.adapters.databricks import constants, parse_model from dbt.adapters.databricks.catalogs._relation import DatabricksCatalogRelation @@ -13,9 +14,24 @@ class UnityCatalogIntegration(CatalogIntegration): def __init__(self, config: CatalogIntegrationConfig) -> None: super().__init__(config) - if location_root := config.adapter_properties.get("location_root"): + location_root = config.adapter_properties.get("location_root") + if location_root is not None: + if not str(location_root).strip(): + raise DbtValidationError( + f"Catalog '{config.name}' unity/databricks location_root cannot be blank" + ) self.external_volume: Optional[str] = location_root self.file_format: Optional[str] = config.file_format + use_uniform = config.adapter_properties.get("use_uniform", False) + ff = (config.file_format or "").lower() + if use_uniform and ff != "delta": + raise DbtValidationError( + f"Catalog '{config.name}' unity/databricks use_uniform: true requires file_format: delta" + ) + if not use_uniform and ff and ff != "parquet": + raise DbtValidationError( + f"Catalog '{config.name}' unity/databricks use_uniform: false (or unset) requires file_format: parquet" + ) @property def location_root(self) -> Optional[str]: diff --git a/dbt/adapters/databricks/impl.py b/dbt/adapters/databricks/impl.py index cc9be7a5b..1ac70ea27 100644 --- a/dbt/adapters/databricks/impl.py +++ b/dbt/adapters/databricks/impl.py @@ -244,6 +244,7 @@ class DatabricksAdapter(SparkAdapter): { Capability.TableLastModifiedMetadata: CapabilitySupport(support=Support.Full), Capability.SchemaMetadataByRelations: CapabilitySupport(support=Support.Full), + Capability.CatalogsV2: CapabilitySupport(support=Support.Full), } ) @@ -251,6 +252,10 @@ class DatabricksAdapter(SparkAdapter): HiveMetastoreCatalogIntegration, UnityCatalogIntegration, ] + _V2_TO_V1_TYPE: ClassVar[dict[str, str]] = { + "unity": constants.UNITY_CATALOG_TYPE, + "hive_metastore": constants.HIVE_METASTORE_CATALOG_TYPE, + } CONSTRAINT_SUPPORT = constraints.CONSTRAINT_SUPPORT get_column_behavior: GetColumnsBehavior @@ -289,6 +294,9 @@ def _has_dbr_capability_parse(self, capability_name: str) -> bool: return False return DBRCapabilities(is_sql_warehouse=True).has_capability(capability) + def _v2_to_v1_type(self, catalog_type: str) -> str: + return self._V2_TO_V1_TYPE.get(catalog_type, catalog_type) + @property def _behavior_flags(self) -> list[BehaviorFlag]: return [ diff --git a/tests/unit/test_catalogs_v2.py b/tests/unit/test_catalogs_v2.py new file mode 100644 index 000000000..26b6e1fe5 --- /dev/null +++ b/tests/unit/test_catalogs_v2.py @@ -0,0 +1,120 @@ +from dataclasses import dataclass, field +from typing import Any, Dict, Optional + +import pytest + +from dbt.adapters.capability import Capability, Support +from dbt.adapters.databricks.catalogs import ( + HiveMetastoreCatalogIntegration, + UnityCatalogIntegration, +) +from dbt.adapters.databricks.impl import DatabricksAdapter +from dbt_common.exceptions import DbtValidationError + + +@dataclass +class _Config: + """Minimal CatalogIntegrationConfig stub for testing __init__ validation.""" + + name: str = "test_cat" + catalog_type: str = "unity" + catalog_name: Optional[str] = None + table_format: Optional[str] = "iceberg" + external_volume: Optional[str] = None + file_format: Optional[str] = None + adapter_properties: Dict[str, Any] = field(default_factory=dict) + + +# ===== Adapter-level ===== + + +def test_catalogs_v2_capability_declared(): + cap = DatabricksAdapter._capabilities[Capability.CatalogsV2] + assert cap.support == Support.Full + + +def test_v2_to_v1_type_unity(): + adapter = object.__new__(DatabricksAdapter) + assert adapter._v2_to_v1_type("unity") == "unity" + + +def test_v2_to_v1_type_hive_metastore(): + adapter = object.__new__(DatabricksAdapter) + assert adapter._v2_to_v1_type("hive_metastore") == "hive_metastore" + + +def test_v2_to_v1_type_unknown_passthrough(): + adapter = object.__new__(DatabricksAdapter) + assert adapter._v2_to_v1_type("custom_type") == "custom_type" + + +# ===== UnityCatalogIntegration ===== + + +def test_unity_parquet_without_uniform(): + cfg = _Config(file_format="parquet") + integration = UnityCatalogIntegration(cfg) + assert integration.file_format == "parquet" + + +def test_unity_delta_with_uniform(): + cfg = _Config(file_format="delta", adapter_properties={"use_uniform": True}) + integration = UnityCatalogIntegration(cfg) + assert integration.file_format == "delta" + + +def test_unity_with_location_root(): + cfg = _Config(file_format="parquet", adapter_properties={"location_root": "/mnt/data"}) + integration = UnityCatalogIntegration(cfg) + assert integration.external_volume == "/mnt/data" + + +def test_unity_delta_without_uniform_raises(): + cfg = _Config(file_format="delta") + with pytest.raises(DbtValidationError, match="use_uniform: false.*requires file_format: parquet"): + UnityCatalogIntegration(cfg) + + +def test_unity_parquet_with_uniform_raises(): + cfg = _Config(file_format="parquet", adapter_properties={"use_uniform": True}) + with pytest.raises(DbtValidationError, match="use_uniform: true.*requires file_format: delta"): + UnityCatalogIntegration(cfg) + + +def test_unity_blank_location_root_raises(): + cfg = _Config(file_format="parquet", adapter_properties={"location_root": " "}) + with pytest.raises(DbtValidationError, match="location_root cannot be blank"): + UnityCatalogIntegration(cfg) + + +def test_unity_empty_location_root_raises(): + cfg = _Config(file_format="parquet", adapter_properties={"location_root": ""}) + with pytest.raises(DbtValidationError, match="location_root cannot be blank"): + UnityCatalogIntegration(cfg) + + +# ===== HiveMetastoreCatalogIntegration ===== + + +def test_hive_delta_valid(): + cfg = _Config(catalog_type="hive_metastore", file_format="delta") + integration = HiveMetastoreCatalogIntegration(cfg) + assert integration.file_format == "delta" + + +def test_hive_parquet_valid(): + cfg = _Config(catalog_type="hive_metastore", file_format="parquet") + integration = HiveMetastoreCatalogIntegration(cfg) + assert integration.file_format == "parquet" + + +def test_hive_hudi_valid(): + cfg = _Config(catalog_type="hive_metastore", file_format="hudi") + integration = HiveMetastoreCatalogIntegration(cfg) + assert integration.file_format == "hudi" + + +def test_hive_invalid_file_format_raises(): + cfg = _Config(catalog_type="hive_metastore", file_format="avro") + with pytest.raises(DbtValidationError, match="file_format"): + HiveMetastoreCatalogIntegration(cfg) \ No newline at end of file