From c69e0b5e8cab4d9f2c41c12cdfc50512c2f72b7d Mon Sep 17 00:00:00 2001 From: nssalian Date: Mon, 12 Jan 2026 20:21:40 -0800 Subject: [PATCH 1/3] Consolidating InMemoryCatalog and SqlCatalog tests --- tests/catalog/conftest.py | 183 +++ tests/catalog/test_base.py | 570 +-------- tests/catalog/test_catalog_behaviors.py | 1176 ++++++++++++++++++ tests/catalog/test_sql.py | 1509 +---------------------- tests/conftest.py | 13 +- tests/table/test_upsert.py | 4 +- 6 files changed, 1368 insertions(+), 2087 deletions(-) create mode 100644 tests/catalog/conftest.py create mode 100644 tests/catalog/test_catalog_behaviors.py diff --git a/tests/catalog/conftest.py b/tests/catalog/conftest.py new file mode 100644 index 0000000000..1d819af243 --- /dev/null +++ b/tests/catalog/conftest.py @@ -0,0 +1,183 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Pytest fixtures for parameterized catalog behavior tests.""" + +import os +from collections.abc import Generator +from pathlib import Path + +import pytest +from pytest_lazyfixture import lazy_fixture + +from pyiceberg.catalog import Catalog +from pyiceberg.catalog.memory import InMemoryCatalog +from pyiceberg.catalog.sql import SqlCatalog +from pyiceberg.typedef import Identifier + + +def _create_memory_catalog(name: str, warehouse: Path) -> InMemoryCatalog: + return InMemoryCatalog(name, warehouse=f"file://{warehouse}") + + +def _create_sql_catalog(name: str, warehouse: Path) -> SqlCatalog: + catalog = SqlCatalog( + name, + uri="sqlite:///:memory:", + warehouse=f"file://{warehouse}", + ) + catalog.create_tables() + return catalog + + +def _create_sql_without_rowcount_catalog(name: str, warehouse: Path) -> SqlCatalog: + props = { + "uri": f"sqlite:////{warehouse}/sql-catalog", + "warehouse": f"file://{warehouse}", + } + catalog = SqlCatalog(name, **props) + catalog.engine.dialect.supports_sane_rowcount = False + catalog.create_tables() + return catalog + + +_CATALOG_FACTORIES = { + "memory": _create_memory_catalog, + "sql": _create_sql_catalog, + "sql_without_rowcount": _create_sql_without_rowcount_catalog, +} + + +@pytest.fixture(params=list(_CATALOG_FACTORIES.keys())) +def catalog(request: pytest.FixtureRequest, tmp_path: Path) -> Generator[Catalog, None, None]: + """Parameterized fixture that yields catalogs listed in _CATALOG_FACTORIES.""" + catalog_type = request.param + factory = _CATALOG_FACTORIES[catalog_type] + cat = factory("test_catalog", tmp_path) + yield cat + if hasattr(cat, "destroy_tables"): + cat.destroy_tables() + + +@pytest.fixture(params=list(_CATALOG_FACTORIES.keys())) +def catalog_with_warehouse( + request: pytest.FixtureRequest, + warehouse: Path, +) -> Generator[Catalog, None, None]: + factory = _CATALOG_FACTORIES[request.param] + cat = factory("test_catalog", warehouse) + yield cat + if hasattr(cat, "destroy_tables"): + cat.destroy_tables() + + +@pytest.fixture(name="random_table_identifier") +def fixture_random_table_identifier(warehouse: Path, database_name: str, table_name: str) -> Identifier: + os.makedirs(f"{warehouse}/{database_name}/{table_name}/metadata/", exist_ok=True) + return database_name, table_name + + +@pytest.fixture(name="another_random_table_identifier") +def fixture_another_random_table_identifier(warehouse: Path, database_name: str, table_name: str) -> Identifier: + database_name = database_name + "_new" + table_name = table_name + "_new" + os.makedirs(f"{warehouse}/{database_name}/{table_name}/metadata/", exist_ok=True) + return database_name, table_name + + +@pytest.fixture(name="random_hierarchical_identifier") +def fixture_random_hierarchical_identifier(warehouse: Path, hierarchical_namespace_name: str, table_name: str) -> Identifier: + os.makedirs(f"{warehouse}/{hierarchical_namespace_name}/{table_name}/metadata/", exist_ok=True) + return Catalog.identifier_to_tuple(".".join((hierarchical_namespace_name, table_name))) + + +@pytest.fixture(name="another_random_hierarchical_identifier") +def fixture_another_random_hierarchical_identifier( + warehouse: Path, hierarchical_namespace_name: str, table_name: str +) -> Identifier: + hierarchical_namespace_name = hierarchical_namespace_name + "_new" + table_name = table_name + "_new" + os.makedirs(f"{warehouse}/{hierarchical_namespace_name}/{table_name}/metadata/", exist_ok=True) + return Catalog.identifier_to_tuple(".".join((hierarchical_namespace_name, table_name))) + + +@pytest.fixture(scope="session") +def fixed_test_table_identifier() -> Identifier: + return "com", "organization", "department", "my_table" + + +@pytest.fixture(scope="session") +def another_fixed_test_table_identifier() -> Identifier: + return "com", "organization", "department_alt", "my_another_table" + + +@pytest.fixture(scope="session") +def fixed_test_table_namespace() -> Identifier: + return "com", "organization", "department" + + +@pytest.fixture( + scope="session", + params=[ + lazy_fixture("fixed_test_table_identifier"), + lazy_fixture("random_table_identifier"), + lazy_fixture("random_hierarchical_identifier"), + ], +) +def test_table_identifier(request) -> Identifier: + return request.param + + +@pytest.fixture( + scope="session", + params=[ + lazy_fixture("another_fixed_test_table_identifier"), + lazy_fixture("another_random_table_identifier"), + lazy_fixture("another_random_hierarchical_identifier"), + ], +) +def another_table_identifier(request) -> Identifier: + return request.param + + +@pytest.fixture( + params=[ + lazy_fixture("database_name"), + lazy_fixture("hierarchical_namespace_name"), + lazy_fixture("fixed_test_table_namespace"), + ], +) +def test_namespace(request) -> Identifier: + ns = request.param + if isinstance(ns, tuple): + return ns + if "." in ns: + return tuple(ns.split(".")) + return (ns,) + + +@pytest.fixture(scope="session") +def test_namespace_properties() -> dict[str, str]: + return {"key1": "value1", "key2": "value2"} + + +@pytest.fixture(scope="session") +def test_table_properties() -> dict[str, str]: + return { + "key1": "value1", + "key2": "value2", + } diff --git a/tests/catalog/test_base.py b/tests/catalog/test_base.py index 42702c8c2b..27058f0c54 100644 --- a/tests/catalog/test_base.py +++ b/tests/catalog/test_base.py @@ -19,35 +19,13 @@ from pathlib import PosixPath -import pyarrow as pa import pytest -from pydantic_core import ValidationError -from pytest_lazyfixture import lazy_fixture -from pyiceberg.catalog import Catalog, load_catalog +from pyiceberg.catalog import load_catalog from pyiceberg.catalog.memory import InMemoryCatalog -from pyiceberg.exceptions import ( - NamespaceAlreadyExistsError, - NamespaceNotEmptyError, - NoSuchNamespaceError, - NoSuchTableError, - TableAlreadyExistsError, -) from pyiceberg.io import WAREHOUSE -from pyiceberg.io.pyarrow import schema_to_pyarrow -from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema -from pyiceberg.table import ( - Table, - TableProperties, -) -from pyiceberg.table.update import ( - AddSchemaUpdate, - SetCurrentSchemaUpdate, -) -from pyiceberg.transforms import IdentityTransform -from pyiceberg.typedef import EMPTY_DICT, Properties -from pyiceberg.types import IntegerType, LongType, NestedField, StringType +from pyiceberg.types import NestedField, StringType @pytest.fixture @@ -55,38 +33,6 @@ def catalog(tmp_path: PosixPath) -> InMemoryCatalog: return InMemoryCatalog("test.in_memory.catalog", **{WAREHOUSE: tmp_path.absolute().as_posix(), "test.key": "test.value"}) -TEST_TABLE_IDENTIFIER = ("com", "organization", "department", "my_table") -TEST_TABLE_NAMESPACE = ("com", "organization", "department") -TEST_TABLE_NAME = "my_table" -TEST_TABLE_SCHEMA = Schema( - NestedField(1, "x", LongType(), required=True), - NestedField(2, "y", LongType(), doc="comment", required=True), - NestedField(3, "z", LongType(), required=True), -) -TEST_TABLE_PARTITION_SPEC = PartitionSpec(PartitionField(name="x", transform=IdentityTransform(), source_id=1, field_id=1000)) -TEST_TABLE_PROPERTIES = {"key1": "value1", "key2": "value2"} -NO_SUCH_TABLE_ERROR = "Table does not exist: com.organization.department.my_table" -TABLE_ALREADY_EXISTS_ERROR = "Table com.organization.department.my_table already exists" -NAMESPACE_ALREADY_EXISTS_ERROR = "Namespace \\('com', 'organization', 'department'\\) already exists" -# TODO: consolidate namespace error messages then remove this -DROP_NOT_EXISTING_NAMESPACE_ERROR = "Namespace does not exist: \\('com', 'organization', 'department'\\)" -NO_SUCH_NAMESPACE_ERROR = "Namespace com.organization.department does not exists" -NAMESPACE_NOT_EMPTY_ERROR = "Namespace com.organization.department is not empty" - - -def given_catalog_has_a_table( - catalog: InMemoryCatalog, - properties: Properties = EMPTY_DICT, -) -> Table: - catalog.create_namespace(TEST_TABLE_NAMESPACE) - return catalog.create_table( - identifier=TEST_TABLE_IDENTIFIER, - schema=TEST_TABLE_SCHEMA, - partition_spec=TEST_TABLE_PARTITION_SPEC, - properties=properties or TEST_TABLE_PROPERTIES, - ) - - def test_load_catalog_in_memory() -> None: assert load_catalog("catalog", type="in-memory") @@ -115,523 +61,11 @@ def test_load_catalog_has_type_and_impl() -> None: ) -def test_namespace_from_tuple() -> None: - # Given - identifier = ("com", "organization", "department", "my_table") - # When - namespace_from = Catalog.namespace_from(identifier) - # Then - assert namespace_from == ("com", "organization", "department") - - -def test_namespace_from_str() -> None: - # Given - identifier = "com.organization.department.my_table" - # When - namespace_from = Catalog.namespace_from(identifier) - # Then - assert namespace_from == ("com", "organization", "department") - - -def test_name_from_tuple() -> None: - # Given - identifier = ("com", "organization", "department", "my_table") - # When - name_from = Catalog.table_name_from(identifier) - # Then - assert name_from == "my_table" - - -def test_name_from_str() -> None: - # Given - identifier = "com.organization.department.my_table" - # When - name_from = Catalog.table_name_from(identifier) - # Then - assert name_from == "my_table" - - -def test_create_table(catalog: InMemoryCatalog) -> None: - catalog.create_namespace(TEST_TABLE_NAMESPACE) - table = catalog.create_table( - identifier=TEST_TABLE_IDENTIFIER, - schema=TEST_TABLE_SCHEMA, - partition_spec=TEST_TABLE_PARTITION_SPEC, - properties=TEST_TABLE_PROPERTIES, - ) - assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table - - -def test_create_table_location_override(catalog: InMemoryCatalog) -> None: - new_location = f"{catalog._warehouse_location}/new_location" - catalog.create_namespace(TEST_TABLE_NAMESPACE) - table = catalog.create_table( - identifier=TEST_TABLE_IDENTIFIER, - schema=TEST_TABLE_SCHEMA, - location=new_location, - partition_spec=TEST_TABLE_PARTITION_SPEC, - properties=TEST_TABLE_PROPERTIES, - ) - assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table - assert table.location() == new_location - - -def test_create_table_removes_trailing_slash_from_location(catalog: InMemoryCatalog) -> None: - new_location = f"{catalog._warehouse_location}/new_location" - catalog.create_namespace(TEST_TABLE_NAMESPACE) - table = catalog.create_table( - identifier=TEST_TABLE_IDENTIFIER, - schema=TEST_TABLE_SCHEMA, - location=f"{new_location}/", - partition_spec=TEST_TABLE_PARTITION_SPEC, - properties=TEST_TABLE_PROPERTIES, - ) - assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table - assert table.location() == new_location - - -@pytest.mark.parametrize( - "schema,expected", - [ - (lazy_fixture("pyarrow_schema_simple_without_ids"), lazy_fixture("iceberg_schema_simple_no_ids")), - (lazy_fixture("iceberg_schema_simple"), lazy_fixture("iceberg_schema_simple")), - (lazy_fixture("iceberg_schema_nested"), lazy_fixture("iceberg_schema_nested")), - (lazy_fixture("pyarrow_schema_nested_without_ids"), lazy_fixture("iceberg_schema_nested_no_ids")), - ], -) -def test_convert_schema_if_needed( - schema: Schema | pa.Schema, - expected: Schema, - catalog: InMemoryCatalog, -) -> None: - assert expected == catalog._convert_schema_if_needed(schema) - - -def test_create_table_pyarrow_schema(catalog: InMemoryCatalog, pyarrow_schema_simple_without_ids: pa.Schema) -> None: - catalog.create_namespace(TEST_TABLE_NAMESPACE) - table = catalog.create_table( - identifier=TEST_TABLE_IDENTIFIER, - schema=pyarrow_schema_simple_without_ids, - properties=TEST_TABLE_PROPERTIES, - ) - assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table - - -def test_create_table_raises_error_when_table_already_exists(catalog: InMemoryCatalog) -> None: - # Given - given_catalog_has_a_table(catalog) - # When - with pytest.raises(TableAlreadyExistsError, match=TABLE_ALREADY_EXISTS_ERROR): - catalog.create_table( - identifier=TEST_TABLE_IDENTIFIER, - schema=TEST_TABLE_SCHEMA, - ) - - -def test_load_table(catalog: InMemoryCatalog) -> None: - # Given - given_table = given_catalog_has_a_table(catalog) - # When - table = catalog.load_table(TEST_TABLE_IDENTIFIER) - # Then - assert table == given_table - - -def test_load_table_from_self_identifier(catalog: InMemoryCatalog) -> None: - # Given - given_table = given_catalog_has_a_table(catalog) - # When - intermediate = catalog.load_table(TEST_TABLE_IDENTIFIER) - table = catalog.load_table(intermediate._identifier) - # Then - assert table == given_table - - -def test_table_raises_error_on_table_not_found(catalog: InMemoryCatalog) -> None: - with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR): - catalog.load_table(TEST_TABLE_IDENTIFIER) - - -def test_table_exists(catalog: InMemoryCatalog) -> None: - # Given - given_catalog_has_a_table(catalog) - # Then - assert catalog.table_exists(TEST_TABLE_IDENTIFIER) - - -def test_table_exists_on_table_not_found(catalog: InMemoryCatalog) -> None: - assert not catalog.table_exists(TEST_TABLE_IDENTIFIER) - - -def test_drop_table(catalog: InMemoryCatalog) -> None: - # Given - given_catalog_has_a_table(catalog) - # When - catalog.drop_table(TEST_TABLE_IDENTIFIER) - # Then - with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR): - catalog.load_table(TEST_TABLE_IDENTIFIER) - - -def test_drop_table_from_self_identifier(catalog: InMemoryCatalog) -> None: - # Given - table = given_catalog_has_a_table(catalog) - # When - catalog.drop_table(table._identifier) - # Then - with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR): - catalog.load_table(table._identifier) - with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR): - catalog.load_table(TEST_TABLE_IDENTIFIER) - - -def test_drop_table_that_does_not_exist_raise_error(catalog: InMemoryCatalog) -> None: - with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR): - catalog.load_table(TEST_TABLE_IDENTIFIER) - - -def test_purge_table(catalog: InMemoryCatalog) -> None: - # Given - given_catalog_has_a_table(catalog) - # When - catalog.purge_table(TEST_TABLE_IDENTIFIER) - # Then - with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR): - catalog.load_table(TEST_TABLE_IDENTIFIER) - - -def test_rename_table(catalog: InMemoryCatalog) -> None: - # Given - given_catalog_has_a_table(catalog) - - # When - new_table = "new.namespace.new_table" - catalog.create_namespace(("new", "namespace")) - table = catalog.rename_table(TEST_TABLE_IDENTIFIER, new_table) - - # Then - assert table._identifier == Catalog.identifier_to_tuple(new_table) - - # And - table = catalog.load_table(new_table) - assert table._identifier == Catalog.identifier_to_tuple(new_table) - - # And - assert catalog._namespace_exists(table._identifier[:-1]) - - # And - with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR): - catalog.load_table(TEST_TABLE_IDENTIFIER) - - -def test_rename_table_from_self_identifier(catalog: InMemoryCatalog) -> None: - # Given - table = given_catalog_has_a_table(catalog) - - # When - new_table_name = "new.namespace.new_table" - catalog.create_namespace(("new", "namespace")) - new_table = catalog.rename_table(table._identifier, new_table_name) - - # Then - assert new_table._identifier == Catalog.identifier_to_tuple(new_table_name) - - # And - new_table = catalog.load_table(new_table._identifier) - assert new_table._identifier == Catalog.identifier_to_tuple(new_table_name) - - # And - assert catalog._namespace_exists(new_table._identifier[:-1]) - - # And - with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR): - catalog.load_table(table._identifier) - with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR): - catalog.load_table(TEST_TABLE_IDENTIFIER) - - -def test_create_namespace(catalog: InMemoryCatalog) -> None: - # When - catalog.create_namespace(TEST_TABLE_NAMESPACE, TEST_TABLE_PROPERTIES) - - # Then - assert catalog._namespace_exists(TEST_TABLE_NAMESPACE) - assert TEST_TABLE_PROPERTIES == catalog.load_namespace_properties(TEST_TABLE_NAMESPACE) - - -def test_create_namespace_raises_error_on_existing_namespace(catalog: InMemoryCatalog) -> None: - # Given - catalog.create_namespace(TEST_TABLE_NAMESPACE, TEST_TABLE_PROPERTIES) - # When - with pytest.raises(NamespaceAlreadyExistsError, match=NAMESPACE_ALREADY_EXISTS_ERROR): - catalog.create_namespace(TEST_TABLE_NAMESPACE, TEST_TABLE_PROPERTIES) - - -def test_get_namespace_metadata_raises_error_when_namespace_does_not_exist(catalog: InMemoryCatalog) -> None: - with pytest.raises(NoSuchNamespaceError, match=NO_SUCH_NAMESPACE_ERROR): - catalog.load_namespace_properties(TEST_TABLE_NAMESPACE) - - -def test_list_namespaces(catalog: InMemoryCatalog) -> None: - # Given - catalog.create_namespace(TEST_TABLE_NAMESPACE, TEST_TABLE_PROPERTIES) - # When - namespaces = catalog.list_namespaces() - # Then - assert TEST_TABLE_NAMESPACE[:1] in namespaces - - # When - namespaces = catalog.list_namespaces(TEST_TABLE_NAMESPACE) - # Then - assert not namespaces - - -def test_drop_namespace(catalog: InMemoryCatalog) -> None: - # Given - catalog.create_namespace(TEST_TABLE_NAMESPACE, TEST_TABLE_PROPERTIES) - # When - catalog.drop_namespace(TEST_TABLE_NAMESPACE) - # Then - assert not catalog._namespace_exists(TEST_TABLE_NAMESPACE) - - -def test_drop_namespace_raises_error_when_namespace_does_not_exist(catalog: InMemoryCatalog) -> None: - with pytest.raises(NoSuchNamespaceError, match=DROP_NOT_EXISTING_NAMESPACE_ERROR): - catalog.drop_namespace(TEST_TABLE_NAMESPACE) - - -def test_drop_namespace_raises_error_when_namespace_not_empty(catalog: InMemoryCatalog) -> None: - # Given - given_catalog_has_a_table(catalog) - # When - with pytest.raises(NamespaceNotEmptyError, match=NAMESPACE_NOT_EMPTY_ERROR): - catalog.drop_namespace(TEST_TABLE_NAMESPACE) - - -def test_list_tables(catalog: InMemoryCatalog) -> None: - # Given - given_catalog_has_a_table(catalog) - # When - tables = catalog.list_tables(namespace=TEST_TABLE_NAMESPACE) - # Then - assert tables - assert TEST_TABLE_IDENTIFIER in tables - - -def test_list_tables_under_a_namespace(catalog: InMemoryCatalog) -> None: - # Given - given_catalog_has_a_table(catalog) - new_namespace = ("new", "namespace") - catalog.create_namespace(new_namespace) - # When - all_tables = catalog.list_tables(namespace=TEST_TABLE_NAMESPACE) - new_namespace_tables = catalog.list_tables(new_namespace) - # Then - assert all_tables - assert TEST_TABLE_IDENTIFIER in all_tables - assert new_namespace_tables == [] - - -def test_update_namespace_metadata(catalog: InMemoryCatalog) -> None: - # Given - catalog.create_namespace(TEST_TABLE_NAMESPACE, TEST_TABLE_PROPERTIES) - - # When - new_metadata = {"key3": "value3", "key4": "value4"} - summary = catalog.update_namespace_properties(TEST_TABLE_NAMESPACE, updates=new_metadata) - - # Then - assert catalog._namespace_exists(TEST_TABLE_NAMESPACE) - assert new_metadata.items() <= catalog.load_namespace_properties(TEST_TABLE_NAMESPACE).items() - assert summary.removed == [] - assert sorted(summary.updated) == ["key3", "key4"] - assert summary.missing == [] - - -def test_update_namespace_metadata_removals(catalog: InMemoryCatalog) -> None: - # Given - catalog.create_namespace(TEST_TABLE_NAMESPACE, TEST_TABLE_PROPERTIES) - - # When - new_metadata = {"key3": "value3", "key4": "value4"} - remove_metadata = {"key1"} - summary = catalog.update_namespace_properties(TEST_TABLE_NAMESPACE, remove_metadata, new_metadata) - - # Then - assert catalog._namespace_exists(TEST_TABLE_NAMESPACE) - assert new_metadata.items() <= catalog.load_namespace_properties(TEST_TABLE_NAMESPACE).items() - assert remove_metadata.isdisjoint(catalog.load_namespace_properties(TEST_TABLE_NAMESPACE).keys()) - assert summary.removed == ["key1"] - assert sorted(summary.updated) == ["key3", "key4"] - assert summary.missing == [] - - -def test_update_namespace_metadata_raises_error_when_namespace_does_not_exist(catalog: InMemoryCatalog) -> None: - with pytest.raises(NoSuchNamespaceError, match=NO_SUCH_NAMESPACE_ERROR): - catalog.update_namespace_properties(TEST_TABLE_NAMESPACE, updates=TEST_TABLE_PROPERTIES) - - -def test_commit_table(catalog: InMemoryCatalog) -> None: - # Given - given_table = given_catalog_has_a_table(catalog) - new_schema = Schema( - NestedField(1, "x", LongType()), - NestedField(2, "y", LongType(), doc="comment"), - NestedField(3, "z", LongType()), - NestedField(4, "add", LongType()), - ) - - # When - response = given_table.catalog.commit_table( - given_table, - updates=( - AddSchemaUpdate(schema=new_schema), - SetCurrentSchemaUpdate(schema_id=-1), - ), - requirements=(), - ) - - # Then - assert response.metadata.table_uuid == given_table.metadata.table_uuid - assert len(response.metadata.schemas) == 2 - assert response.metadata.schemas[1] == new_schema - assert response.metadata.current_schema_id == new_schema.schema_id - - -def test_add_column(catalog: InMemoryCatalog) -> None: - given_table = given_catalog_has_a_table(catalog) - - given_table.update_schema().add_column(path="new_column1", field_type=IntegerType()).commit() - - assert given_table.schema() == Schema( - NestedField(field_id=1, name="x", field_type=LongType(), required=True), - NestedField(field_id=2, name="y", field_type=LongType(), required=True, doc="comment"), - NestedField(field_id=3, name="z", field_type=LongType(), required=True), - NestedField(field_id=4, name="new_column1", field_type=IntegerType(), required=False), - identifier_field_ids=[], - ) - assert given_table.schema().schema_id == 1 - - transaction = given_table.transaction() - transaction.update_schema().add_column(path="new_column2", field_type=IntegerType(), doc="doc").commit() - transaction.commit_transaction() - - assert given_table.schema() == Schema( - NestedField(field_id=1, name="x", field_type=LongType(), required=True), - NestedField(field_id=2, name="y", field_type=LongType(), required=True, doc="comment"), - NestedField(field_id=3, name="z", field_type=LongType(), required=True), - NestedField(field_id=4, name="new_column1", field_type=IntegerType(), required=False), - NestedField(field_id=5, name="new_column2", field_type=IntegerType(), required=False, doc="doc"), - identifier_field_ids=[], - ) - assert given_table.schema().schema_id == 2 - - -def test_add_column_with_statement(catalog: InMemoryCatalog) -> None: - given_table = given_catalog_has_a_table(catalog) - - with given_table.update_schema() as tx: - tx.add_column(path="new_column1", field_type=IntegerType()) - - assert given_table.schema() == Schema( - NestedField(field_id=1, name="x", field_type=LongType(), required=True), - NestedField(field_id=2, name="y", field_type=LongType(), required=True, doc="comment"), - NestedField(field_id=3, name="z", field_type=LongType(), required=True), - NestedField(field_id=4, name="new_column1", field_type=IntegerType(), required=False), - identifier_field_ids=[], - ) - assert given_table.schema().schema_id == 1 - - with given_table.transaction() as tx: - tx.update_schema().add_column(path="new_column2", field_type=IntegerType(), doc="doc").commit() - - assert given_table.schema() == Schema( - NestedField(field_id=1, name="x", field_type=LongType(), required=True), - NestedField(field_id=2, name="y", field_type=LongType(), required=True, doc="comment"), - NestedField(field_id=3, name="z", field_type=LongType(), required=True), - NestedField(field_id=4, name="new_column1", field_type=IntegerType(), required=False), - NestedField(field_id=5, name="new_column2", field_type=IntegerType(), required=False, doc="doc"), - identifier_field_ids=[], - ) - assert given_table.schema().schema_id == 2 - - def test_catalog_repr(catalog: InMemoryCatalog) -> None: s = repr(catalog) assert s == "test.in_memory.catalog ()" -def test_table_properties_int_value(catalog: InMemoryCatalog) -> None: - # table properties can be set to int, but still serialized to string - property_with_int = {"property_name": 42} - given_table = given_catalog_has_a_table(catalog, properties=property_with_int) - assert isinstance(given_table.properties["property_name"], str) - - -def test_table_properties_raise_for_none_value(catalog: InMemoryCatalog) -> None: - property_with_none = {"property_name": None} - with pytest.raises(ValidationError) as exc_info: - _ = given_catalog_has_a_table(catalog, properties=property_with_none) - assert "None type is not a supported value in properties: property_name" in str(exc_info.value) - - -def test_table_writes_metadata_to_custom_location(catalog: InMemoryCatalog) -> None: - metadata_path = f"{catalog._warehouse_location}/custom/path" - catalog.create_namespace(TEST_TABLE_NAMESPACE) - table = catalog.create_table( - identifier=TEST_TABLE_IDENTIFIER, - schema=TEST_TABLE_SCHEMA, - partition_spec=TEST_TABLE_PARTITION_SPEC, - properties={TableProperties.WRITE_METADATA_PATH: metadata_path}, - ) - df = pa.Table.from_pylist([{"x": 123, "y": 456, "z": 789}], schema=schema_to_pyarrow(TEST_TABLE_SCHEMA)) - table.append(df) - manifests = table.current_snapshot().manifests(table.io) # type: ignore - location_provider = table.location_provider() - - assert location_provider.new_metadata_location("").startswith(metadata_path) - assert manifests[0].manifest_path.startswith(metadata_path) - assert table.location() != metadata_path - assert table.metadata_location.startswith(metadata_path) - - -def test_table_writes_metadata_to_default_path(catalog: InMemoryCatalog) -> None: - catalog.create_namespace(TEST_TABLE_NAMESPACE) - table = catalog.create_table( - identifier=TEST_TABLE_IDENTIFIER, - schema=TEST_TABLE_SCHEMA, - partition_spec=TEST_TABLE_PARTITION_SPEC, - properties=TEST_TABLE_PROPERTIES, - ) - metadata_path = f"{table.location()}/metadata" - df = pa.Table.from_pylist([{"x": 123, "y": 456, "z": 789}], schema=schema_to_pyarrow(TEST_TABLE_SCHEMA)) - table.append(df) - manifests = table.current_snapshot().manifests(table.io) # type: ignore - location_provider = table.location_provider() - - assert location_provider.new_metadata_location("").startswith(metadata_path) - assert manifests[0].manifest_path.startswith(metadata_path) - assert table.metadata_location.startswith(metadata_path) - - -def test_table_metadata_writes_reflect_latest_path(catalog: InMemoryCatalog) -> None: - catalog.create_namespace(TEST_TABLE_NAMESPACE) - table = catalog.create_table( - identifier=TEST_TABLE_IDENTIFIER, - schema=TEST_TABLE_SCHEMA, - partition_spec=TEST_TABLE_PARTITION_SPEC, - ) - - initial_metadata_path = f"{table.location()}/metadata" - assert table.location_provider().new_metadata_location("metadata.json") == f"{initial_metadata_path}/metadata.json" - - # update table with new path for metadata - new_metadata_path = f"{table.location()}/custom/path" - table.transaction().set_properties({TableProperties.WRITE_METADATA_PATH: new_metadata_path}).commit_transaction() - - assert table.location_provider().new_metadata_location("metadata.json") == f"{new_metadata_path}/metadata.json" - - class TestCatalogClose: """Test catalog close functionality.""" diff --git a/tests/catalog/test_catalog_behaviors.py b/tests/catalog/test_catalog_behaviors.py new file mode 100644 index 0000000000..fe59329e79 --- /dev/null +++ b/tests/catalog/test_catalog_behaviors.py @@ -0,0 +1,1176 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Consolidated behavior tests for InMemoryCatalog and SqlCatalog. +""" + +import os +from pathlib import Path +from typing import Any + +import pyarrow as pa +import pytest +from pydantic_core import ValidationError +from pytest_lazyfixture import lazy_fixture +from sqlalchemy.exc import IntegrityError + +from pyiceberg.catalog import Catalog +from pyiceberg.exceptions import ( + CommitFailedException, + NamespaceAlreadyExistsError, + NamespaceNotEmptyError, + NoSuchNamespaceError, + NoSuchTableError, + TableAlreadyExistsError, +) +from pyiceberg.io.pyarrow import _dataframe_to_data_files, schema_to_pyarrow +from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec +from pyiceberg.schema import Schema +from pyiceberg.table import AddSchemaUpdate, SetCurrentSchemaUpdate, TableProperties +from pyiceberg.table.snapshots import Operation +from pyiceberg.table.sorting import NullOrder, SortDirection, SortField, SortOrder +from pyiceberg.transforms import IdentityTransform +from pyiceberg.typedef import Identifier +from pyiceberg.types import BooleanType, IntegerType, LongType, NestedField, StringType + + +# Name parsing tests +def test_namespace_from_tuple() -> None: + identifier = ("com", "organization", "department", "my_table") + namespace_from = Catalog.namespace_from(identifier) + assert namespace_from == ("com", "organization", "department") + + +def test_namespace_from_str() -> None: + identifier = "com.organization.department.my_table" + namespace_from = Catalog.namespace_from(identifier) + assert namespace_from == ("com", "organization", "department") + + +def test_name_from_tuple() -> None: + identifier = ("com", "organization", "department", "my_table") + name_from = Catalog.table_name_from(identifier) + assert name_from == "my_table" + + +def test_name_from_str() -> None: + identifier = "com.organization.department.my_table" + name_from = Catalog.table_name_from(identifier) + assert name_from == "my_table" + + +# Create table tests +def test_create_table(catalog: Catalog, test_table_identifier: Identifier, table_schema_simple: Schema) -> None: + namespace = Catalog.namespace_from(test_table_identifier) + catalog.create_namespace(namespace) + table = catalog.create_table(test_table_identifier, table_schema_simple) + loaded = catalog.load_table(test_table_identifier) + assert loaded.name() == table.name() + assert loaded.metadata_location == table.metadata_location + + +def test_create_table_if_not_exists_duplicated_table( + catalog: Catalog, table_schema_nested: Schema, test_table_identifier: Identifier +) -> None: + namespace = Catalog.namespace_from(test_table_identifier) + catalog.create_namespace(namespace) + table1 = catalog.create_table(test_table_identifier, table_schema_nested) + table2 = catalog.create_table_if_not_exists(test_table_identifier, table_schema_nested) + assert table1.name() == table2.name() + + +def test_create_table_raises_error_when_table_already_exists( + catalog: Catalog, test_table_identifier: Identifier, table_schema_nested: Schema +) -> None: + namespace = Catalog.namespace_from(test_table_identifier) + catalog.create_namespace(namespace) + catalog.create_table(test_table_identifier, table_schema_nested) + with pytest.raises(TableAlreadyExistsError): + catalog.create_table(test_table_identifier, table_schema_nested) + + +def test_table_exists(catalog: Catalog, test_table_identifier: Identifier, table_schema_nested: Schema) -> None: + namespace = Catalog.namespace_from(test_table_identifier) + catalog.create_namespace(namespace) + catalog.create_table(test_table_identifier, table_schema_nested, properties={"format-version": "2"}) + assert catalog.table_exists(test_table_identifier) + + +def test_table_exists_on_table_not_found(catalog: Catalog, test_table_identifier: Identifier) -> None: + assert not catalog.table_exists(test_table_identifier) + + +def test_create_table_raises_error_when_namespace_does_not_exist(catalog: Catalog, table_schema_simple: Schema) -> None: + with pytest.raises(NoSuchNamespaceError): + catalog.create_table(("non_existent_ns", "table"), table_schema_simple) + + +def test_table_raises_error_on_table_not_found(catalog: Catalog, test_table_identifier: Identifier) -> None: + identifier_str = ".".join(test_table_identifier) + with pytest.raises(NoSuchTableError, match=f"Table does not exist: {identifier_str}"): + catalog.load_table(test_table_identifier) + + +def test_create_table_without_namespace(catalog: Catalog, table_schema_nested: Schema, table_name: str) -> None: + with pytest.raises(NoSuchNamespaceError): + catalog.create_table(table_name, table_schema_nested) + + +@pytest.mark.parametrize("format_version", [1, 2]) +def test_create_table_transaction(catalog: Catalog, format_version: int) -> None: + identifier = f"default.arrow_create_table_transaction_{catalog.name}_{format_version}" + try: + catalog.create_namespace("default") + except NamespaceAlreadyExistsError: + pass + + try: + catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + + pa_table = pa.Table.from_pydict( + { + "foo": ["a", None, "z"], + }, + schema=pa.schema([pa.field("foo", pa.large_string(), nullable=True)]), + ) + + pa_table_with_column = pa.Table.from_pydict( + { + "foo": ["a", None, "z"], + "bar": [19, None, 25], + }, + schema=pa.schema( + [ + pa.field("foo", pa.large_string(), nullable=True), + pa.field("bar", pa.int32(), nullable=True), + ] + ), + ) + + with catalog.create_table_transaction( + identifier=identifier, schema=pa_table.schema, properties={"format-version": str(format_version)} + ) as txn: + with txn.update_snapshot().fast_append() as snapshot_update: + for data_file in _dataframe_to_data_files(table_metadata=txn.table_metadata, df=pa_table, io=txn._table.io): + snapshot_update.append_data_file(data_file) + + with txn.update_schema() as schema_txn: + schema_txn.union_by_name(pa_table_with_column.schema) + + with txn.update_snapshot().fast_append() as snapshot_update: + for data_file in _dataframe_to_data_files( + table_metadata=txn.table_metadata, df=pa_table_with_column, io=txn._table.io + ): + snapshot_update.append_data_file(data_file) + + tbl = catalog.load_table(identifier=identifier) + assert tbl.format_version == format_version + assert len(tbl.scan().to_arrow()) == 6 + + +def test_create_table_default_sort_order( + catalog: Catalog, table_schema_nested: Schema, test_table_identifier: Identifier +) -> None: + namespace = Catalog.namespace_from(test_table_identifier) + catalog.create_namespace(namespace) + table = catalog.create_table(test_table_identifier, table_schema_nested) + assert table.sort_order().order_id == 0, "Order ID must match" + assert table.sort_order().is_unsorted is True, "Order must be unsorted" + catalog.drop_table(test_table_identifier) + + +def test_create_v1_table(catalog: Catalog, table_schema_nested: Schema, test_table_identifier: Identifier) -> None: + namespace = Catalog.namespace_from(test_table_identifier) + catalog.create_namespace(namespace) + table = catalog.create_table(test_table_identifier, table_schema_nested, properties={"format-version": "1"}) + assert table.sort_order().order_id == 0, "Order ID must match" + assert table.sort_order().is_unsorted is True, "Order must be unsorted" + assert table.format_version == 1 + assert table.spec() == UNPARTITIONED_PARTITION_SPEC + catalog.drop_table(test_table_identifier) + + +def test_create_table_custom_sort_order(catalog: Catalog, table_schema_nested: Schema, test_table_identifier: Identifier) -> None: + namespace = Catalog.namespace_from(test_table_identifier) + catalog.create_namespace(namespace) + order = SortOrder(SortField(source_id=2, transform=IdentityTransform(), null_order=NullOrder.NULLS_FIRST)) + table = catalog.create_table(test_table_identifier, table_schema_nested, sort_order=order) + given_sort_order = table.sort_order() + assert given_sort_order.order_id == 1, "Order ID must match" + assert len(given_sort_order.fields) == 1, "Order must have 1 field" + assert given_sort_order.fields[0].direction == SortDirection.ASC, "Direction must match" + assert given_sort_order.fields[0].null_order == NullOrder.NULLS_FIRST, "Null order must match" + assert isinstance(given_sort_order.fields[0].transform, IdentityTransform), "Transform must match" + catalog.drop_table(test_table_identifier) + + +def test_create_table_with_default_warehouse_location( + warehouse: Path, catalog_with_warehouse: Catalog, table_schema_nested: Schema, test_table_identifier: Identifier +) -> None: + identifier_tuple = Catalog.identifier_to_tuple(test_table_identifier) + namespace = Catalog.namespace_from(test_table_identifier) + catalog_with_warehouse.create_namespace(namespace) + catalog_with_warehouse.create_table(test_table_identifier, table_schema_nested) + table = catalog_with_warehouse.load_table(test_table_identifier) + assert table.name() == identifier_tuple + assert table.metadata_location.startswith(f"file://{warehouse}") + assert os.path.exists(table.metadata_location[len("file://") :]) + catalog_with_warehouse.drop_table(test_table_identifier) + + +def test_create_table_location_override( + catalog: Catalog, tmp_path: Path, table_schema_nested: Schema, test_table_identifier: Identifier, test_table_properties: dict +) -> None: + test_partition_spec = PartitionSpec(PartitionField(name="x", transform=IdentityTransform(), source_id=1, field_id=1000)) + new_location = f"file://{tmp_path}/new_location" + namespace = Catalog.namespace_from(test_table_identifier) + catalog.create_namespace(namespace) + table = catalog.create_table( + identifier=test_table_identifier, + schema=table_schema_nested, + location=new_location, + partition_spec=test_partition_spec, + properties=test_table_properties, + ) + assert catalog.load_table(test_table_identifier) == table + assert table.location() == new_location + + +def test_create_table_removes_trailing_slash_from_location( + warehouse: Path, catalog: Catalog, table_schema_nested: Schema, test_table_identifier: Identifier +) -> None: + identifier_tuple = Catalog.identifier_to_tuple(test_table_identifier) + namespace = Catalog.namespace_from(test_table_identifier) + table_name = Catalog.table_name_from(identifier_tuple) + location = f"file://{warehouse}/{catalog.name}/{table_name}-given" + catalog.create_namespace(namespace) + catalog.create_table(test_table_identifier, table_schema_nested, location=f"{location}/") + table = catalog.load_table(test_table_identifier) + assert table.name() == identifier_tuple + assert table.metadata_location.startswith(f"file://{warehouse}") + assert os.path.exists(table.metadata_location[len("file://") :]) + assert table.location() == location + catalog.drop_table(test_table_identifier) + + +def test_create_tables_idempotency(catalog: Catalog) -> None: + # Second initialization should not fail even if tables are already created + catalog.create_tables() + catalog.create_tables() + + +def test_create_table_pyarrow_schema( + catalog: Catalog, pyarrow_schema_simple_without_ids: pa.Schema, test_table_identifier: Identifier, test_table_properties: dict +) -> None: + namespace = Catalog.namespace_from(test_table_identifier) + catalog.create_namespace(namespace) + table = catalog.create_table( + identifier=test_table_identifier, + schema=pyarrow_schema_simple_without_ids, + properties=test_table_properties, + ) + assert catalog.load_table(test_table_identifier) == table + + +def test_write_pyarrow_schema(catalog: Catalog, test_table_identifier: Identifier) -> None: + pyarrow_table = pa.Table.from_arrays( + [ + pa.array([None, "A", "B", "C"]), # 'foo' column + pa.array([1, 2, 3, 4]), # 'bar' column + pa.array([True, None, False, True]), # 'baz' column + pa.array([None, "A", "B", "C"]), # 'large' column + ], + schema=pa.schema( + [ + pa.field("foo", pa.large_string(), nullable=True), + pa.field("bar", pa.int32(), nullable=False), + pa.field("baz", pa.bool_(), nullable=True), + pa.field("large", pa.large_string(), nullable=True), + ] + ), + ) + namespace = Catalog.namespace_from(test_table_identifier) + catalog.create_namespace(namespace) + table = catalog.create_table(test_table_identifier, pyarrow_table.schema) + table.append(pyarrow_table) + + +@pytest.mark.parametrize( + "schema,expected", + [ + (lazy_fixture("pyarrow_schema_simple_without_ids"), lazy_fixture("iceberg_schema_simple_no_ids")), + (lazy_fixture("table_schema_simple"), lazy_fixture("table_schema_simple")), + (lazy_fixture("table_schema_nested"), lazy_fixture("table_schema_nested")), + (lazy_fixture("pyarrow_schema_nested_without_ids"), lazy_fixture("iceberg_schema_nested_no_ids")), + ], +) +def test_convert_schema_if_needed( + schema: Schema | pa.Schema, + expected: Schema, + catalog: Catalog, +) -> None: + assert expected == catalog._convert_schema_if_needed(schema) + + +# Register table tests + + +def test_register_table(catalog: Catalog, test_table_identifier: Identifier, metadata_location: str) -> None: + identifier_tuple = Catalog.identifier_to_tuple(test_table_identifier) + namespace = Catalog.namespace_from(test_table_identifier) + catalog.create_namespace(namespace) + table = catalog.register_table(test_table_identifier, metadata_location) + assert table.name() == identifier_tuple + assert table.metadata_location == metadata_location + assert os.path.exists(metadata_location) + catalog.drop_table(test_table_identifier) + + +def test_register_existing_table(catalog: Catalog, test_table_identifier: Identifier, metadata_location: str) -> None: + namespace = Catalog.namespace_from(test_table_identifier) + catalog.create_namespace(namespace) + catalog.register_table(test_table_identifier, metadata_location) + with pytest.raises(TableAlreadyExistsError): + catalog.register_table(test_table_identifier, metadata_location) + + +# Load table tests + + +def test_load_table(catalog: Catalog, table_schema_nested: Schema, test_table_identifier: Identifier) -> None: + namespace = Catalog.namespace_from(test_table_identifier) + catalog.create_namespace(namespace) + table = catalog.create_table(test_table_identifier, table_schema_nested) + loaded_table = catalog.load_table(test_table_identifier) + assert table.name() == loaded_table.name() + assert table.metadata_location == loaded_table.metadata_location + assert table.metadata == loaded_table.metadata + + +def test_load_table_from_self_identifier( + catalog: Catalog, table_schema_nested: Schema, test_table_identifier: Identifier +) -> None: + identifier_tuple = Catalog.identifier_to_tuple(test_table_identifier) + namespace = Catalog.namespace_from(test_table_identifier) + catalog.create_namespace(namespace) + table = catalog.create_table(test_table_identifier, table_schema_nested) + intermediate = catalog.load_table(test_table_identifier) + assert intermediate.name() == identifier_tuple + loaded_table = catalog.load_table(intermediate.name()) + assert table.name() == loaded_table.name() + assert table.metadata_location == loaded_table.metadata_location + assert table.metadata == loaded_table.metadata + + +# Rename table tests + + +def test_rename_table( + catalog: Catalog, table_schema_nested: Schema, test_table_identifier: Identifier, another_table_identifier: Identifier +) -> None: + from_namespace = Catalog.namespace_from(test_table_identifier) + to_namespace = Catalog.namespace_from(another_table_identifier) + catalog.create_namespace(from_namespace) + catalog.create_namespace(to_namespace) + table = catalog.create_table(test_table_identifier, table_schema_nested) + assert table.name() == test_table_identifier + catalog.rename_table(test_table_identifier, another_table_identifier) + new_table = catalog.load_table(another_table_identifier) + assert new_table.name() == another_table_identifier + assert new_table.metadata_location == table.metadata_location + with pytest.raises(NoSuchTableError): + catalog.load_table(test_table_identifier) + + +def test_rename_table_from_self_identifier( + catalog: Catalog, table_schema_nested: Schema, test_table_identifier: Identifier, another_table_identifier: Identifier +) -> None: + from_namespace = Catalog.namespace_from(test_table_identifier) + to_namespace = Catalog.namespace_from(another_table_identifier) + catalog.create_namespace(from_namespace) + catalog.create_namespace(to_namespace) + table = catalog.create_table(test_table_identifier, table_schema_nested) + assert table.name() == test_table_identifier + catalog.rename_table(table.name(), another_table_identifier) + new_table = catalog.load_table(another_table_identifier) + assert new_table.name() == another_table_identifier + assert new_table.metadata_location == table.metadata_location + with pytest.raises(NoSuchTableError): + catalog.load_table(table.name()) + with pytest.raises(NoSuchTableError): + catalog.load_table(test_table_identifier) + + +def test_rename_table_to_existing_one( + catalog: Catalog, table_schema_nested: Schema, test_table_identifier: Identifier, another_table_identifier: Identifier +) -> None: + from_namespace = Catalog.namespace_from(test_table_identifier) + to_namespace = Catalog.namespace_from(another_table_identifier) + catalog.create_namespace(from_namespace) + catalog.create_namespace(to_namespace) + table = catalog.create_table(test_table_identifier, table_schema_nested) + assert table.name() == test_table_identifier + new_table = catalog.create_table(another_table_identifier, table_schema_nested) + assert new_table.name() == another_table_identifier + with pytest.raises(TableAlreadyExistsError): + catalog.rename_table(test_table_identifier, another_table_identifier) + + +def test_rename_missing_table(catalog: Catalog, test_table_identifier: Identifier, another_table_identifier: Identifier) -> None: + from_namespace = Catalog.namespace_from(test_table_identifier) + to_namespace = Catalog.namespace_from(another_table_identifier) + catalog.create_namespace(from_namespace) + catalog.create_namespace(to_namespace) + with pytest.raises(NoSuchTableError): + catalog.rename_table(test_table_identifier, another_table_identifier) + + +def test_rename_table_to_missing_namespace( + catalog: Catalog, table_schema_nested: Schema, test_table_identifier: Identifier, another_table_identifier: Identifier +) -> None: + from_namespace = Catalog.namespace_from(test_table_identifier) + catalog.create_namespace(from_namespace) + table = catalog.create_table(test_table_identifier, table_schema_nested) + assert table.name() == test_table_identifier + with pytest.raises(NoSuchNamespaceError): + catalog.rename_table(test_table_identifier, another_table_identifier) + + +# Drop table tests + + +def test_drop_table(catalog: Catalog, table_schema_nested: Schema, test_table_identifier: Identifier) -> None: + identifier_tuple = Catalog.identifier_to_tuple(test_table_identifier) + namespace = Catalog.namespace_from(test_table_identifier) + catalog.create_namespace(namespace) + table = catalog.create_table(test_table_identifier, table_schema_nested) + assert table.name() == identifier_tuple + catalog.drop_table(test_table_identifier) + with pytest.raises(NoSuchTableError): + catalog.load_table(test_table_identifier) + + +def test_drop_table_from_self_identifier( + catalog: Catalog, table_schema_nested: Schema, test_table_identifier: Identifier +) -> None: + identifier_tuple = Catalog.identifier_to_tuple(test_table_identifier) + namespace = Catalog.namespace_from(test_table_identifier) + catalog.create_namespace(namespace) + table = catalog.create_table(test_table_identifier, table_schema_nested) + assert table.name() == identifier_tuple + catalog.drop_table(table.name()) + with pytest.raises(NoSuchTableError): + catalog.load_table(table.name()) + with pytest.raises(NoSuchTableError): + catalog.load_table(test_table_identifier) + + +def test_drop_table_that_does_not_exist_raise_error(catalog: Catalog, test_table_identifier: Identifier) -> None: + with pytest.raises(NoSuchTableError): + catalog.drop_table(test_table_identifier) + + +def test_purge_table(catalog: Catalog, table_schema_simple: Schema, test_table_identifier: Identifier) -> None: + namespace = Catalog.namespace_from(test_table_identifier) + catalog.create_namespace(namespace) + catalog.create_table(test_table_identifier, table_schema_simple) + catalog.purge_table(test_table_identifier) + with pytest.raises(NoSuchTableError, match=f"Table does not exist: {'.'.join(test_table_identifier)}"): + catalog.load_table(test_table_identifier) + + +# List tables tests + + +def test_list_tables( + catalog: Catalog, table_schema_nested: Schema, test_table_identifier: Identifier, another_table_identifier: Identifier +) -> None: + namespace_1 = Catalog.namespace_from(test_table_identifier) + namespace_2 = Catalog.namespace_from(another_table_identifier) + catalog.create_namespace(namespace_1) + catalog.create_namespace(namespace_2) + catalog.create_table(test_table_identifier, table_schema_nested) + catalog.create_table(another_table_identifier, table_schema_nested) + identifier_list = catalog.list_tables(namespace_1) + assert len(identifier_list) == 1 + assert test_table_identifier in identifier_list + + identifier_list = catalog.list_tables(namespace_2) + assert len(identifier_list) == 1 + assert another_table_identifier in identifier_list + + +def test_list_tables_under_a_namespace(catalog: Catalog, table_schema_nested: Schema, test_table_identifier: Identifier) -> None: + namespace = Catalog.namespace_from(test_table_identifier) + catalog.create_namespace(namespace) + catalog.create_table(test_table_identifier, table_schema_nested) + new_namespace = ("new", "namespace") + catalog.create_namespace(new_namespace) + all_tables = catalog.list_tables(namespace=namespace) + new_namespace_tables = catalog.list_tables(new_namespace) + assert all_tables + assert test_table_identifier in all_tables + assert new_namespace_tables == [] + + +def test_list_tables_when_missing_namespace(catalog: Catalog, test_namespace: Identifier) -> None: + with pytest.raises(NoSuchNamespaceError): + catalog.list_tables(test_namespace) + + +# Commit table tests +def test_commit_table(catalog: Catalog, table_schema_nested: Schema, test_table_identifier: Identifier) -> None: + namespace = Catalog.namespace_from(test_table_identifier) + catalog.create_namespace(namespace) + table = catalog.create_table(test_table_identifier, table_schema_nested) + last_updated_ms = table.metadata.last_updated_ms + original_table_metadata_location = table.metadata_location + original_table_last_updated_ms = table.metadata.last_updated_ms + + assert catalog._parse_metadata_version(table.metadata_location) == 0 + assert table.metadata.current_schema_id == 0 + + transaction = table.transaction() + update = transaction.update_schema() + update.add_column(path="b", field_type=IntegerType()) + update.commit() + transaction.commit_transaction() + + updated_table_metadata = table.metadata + + assert catalog._parse_metadata_version(table.metadata_location) == 1 + assert updated_table_metadata.current_schema_id == 1 + assert len(updated_table_metadata.schemas) == 2 + new_schema = next(schema for schema in updated_table_metadata.schemas if schema.schema_id == 1) + assert new_schema + assert new_schema == update._apply() + assert new_schema.find_field("b").field_type == IntegerType() + assert updated_table_metadata.last_updated_ms > last_updated_ms + assert len(updated_table_metadata.metadata_log) == 1 + assert updated_table_metadata.metadata_log[0].metadata_file == original_table_metadata_location + assert updated_table_metadata.metadata_log[0].timestamp_ms == original_table_last_updated_ms + + +def test_catalog_commit_table_applies_schema_updates( + catalog: Catalog, + table_schema_nested: Schema, + test_table_identifier: Identifier, +) -> None: + namespace = Catalog.namespace_from(test_table_identifier) + catalog.create_namespace(namespace) + table = catalog.create_table(test_table_identifier, table_schema_nested) + + new_schema = Schema( + NestedField(1, "x", LongType()), + NestedField(2, "y", LongType(), doc="comment"), + NestedField(3, "z", LongType()), + NestedField(4, "add", LongType()), + ) + + response = table.catalog.commit_table( + table, + updates=( + AddSchemaUpdate(schema=new_schema), + SetCurrentSchemaUpdate(), + ), + requirements=(), + ) + assert response.metadata.table_uuid == table.metadata.table_uuid + assert len(response.metadata.schemas) == 2 + assert response.metadata.schemas[1] == new_schema + assert response.metadata.current_schema_id == new_schema.schema_id + + +def test_concurrent_commit_table(catalog: Catalog, table_schema_nested: Schema, test_table_identifier: Identifier) -> None: + namespace = Catalog.namespace_from(test_table_identifier) + catalog.create_namespace(namespace) + table_a = catalog.create_table(test_table_identifier, table_schema_nested) + table_b = catalog.load_table(test_table_identifier) + + with table_a.update_schema() as update: + update.add_column(path="b", field_type=IntegerType()) + + with pytest.raises(CommitFailedException, match="Requirement failed: current schema id has changed: expected 0, found 1"): + # This one should fail since it already has been updated + with table_b.update_schema() as update: + update.add_column(path="c", field_type=IntegerType()) + + +def test_delete_metadata_multiple(catalog: Catalog, table_schema_nested: Schema, random_table_identifier: Identifier) -> None: + namespace = Catalog.namespace_from(random_table_identifier) + catalog.create_namespace(namespace) + table = catalog.create_table(random_table_identifier, table_schema_nested) + + original_metadata_location = table.metadata_location + + for i in range(5): + with table.transaction() as transaction: + with transaction.update_schema() as update: + update.add_column(path=f"new_column_{i}", field_type=IntegerType()) + + assert len(table.metadata.metadata_log) == 5 + assert os.path.exists(original_metadata_location[len("file://") :]) + + # Set the max versions property to 2, and delete after commit + new_property = { + TableProperties.METADATA_PREVIOUS_VERSIONS_MAX: "2", + TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED: "true", + } + + with table.transaction() as transaction: + transaction.set_properties(properties=new_property) + + # Verify that only the most recent metadata files are kept + assert len(table.metadata.metadata_log) == 2 + updated_metadata_1, updated_metadata_2 = table.metadata.metadata_log + + # new metadata log was added, so earlier metadata logs are removed. + with table.transaction() as transaction: + with transaction.update_schema() as update: + update.add_column(path="new_column_x", field_type=IntegerType()) + + assert len(table.metadata.metadata_log) == 2 + assert not os.path.exists(original_metadata_location[len("file://") :]) + assert not os.path.exists(updated_metadata_1.metadata_file[len("file://") :]) + assert os.path.exists(updated_metadata_2.metadata_file[len("file://") :]) + + +# Table properties tests + + +def test_table_properties_int_value(catalog: Catalog, table_schema_simple: Schema, test_table_identifier: Identifier) -> None: + # table properties can be set to int, but still serialized to string + namespace = Catalog.namespace_from(test_table_identifier) + catalog.create_namespace(namespace) + property_with_int = {"property_name": 42} + table = catalog.create_table(test_table_identifier, table_schema_simple, properties=property_with_int) + assert isinstance(table.properties["property_name"], str) + + +def test_table_properties_raise_for_none_value( + catalog: Catalog, table_schema_simple: Schema, test_table_identifier: Identifier +) -> None: + namespace = Catalog.namespace_from(test_table_identifier) + catalog.create_namespace(namespace) + property_with_none = {"property_name": None} + with pytest.raises(ValidationError) as exc_info: + _ = catalog.create_table(test_table_identifier, table_schema_simple, properties=property_with_none) + assert "None type is not a supported value in properties: property_name" in str(exc_info.value) + + +# Append table + + +def test_append_table(catalog: Catalog, table_schema_simple: Schema, test_table_identifier: Identifier) -> None: + namespace = Catalog.namespace_from(test_table_identifier) + catalog.create_namespace(namespace) + table = catalog.create_table(test_table_identifier, table_schema_simple) + + df = pa.Table.from_pydict( + { + "foo": ["a"], + "bar": [1], + "baz": [True], + }, + schema=schema_to_pyarrow(table_schema_simple), + ) + + table.append(df) + + # new snapshot is written in APPEND mode + assert len(table.metadata.snapshots) == 1 + assert table.metadata.snapshots[0].snapshot_id == table.metadata.current_snapshot_id + assert table.metadata.snapshots[0].parent_snapshot_id is None + assert table.metadata.snapshots[0].sequence_number == 1 + assert table.metadata.snapshots[0].summary is not None + assert table.metadata.snapshots[0].summary.operation == Operation.APPEND + assert table.metadata.snapshots[0].summary["added-data-files"] == "1" + assert table.metadata.snapshots[0].summary["added-records"] == "1" + assert table.metadata.snapshots[0].summary["total-data-files"] == "1" + assert table.metadata.snapshots[0].summary["total-records"] == "1" + assert len(table.metadata.metadata_log) == 1 + + # read back the data + assert df == table.scan().to_arrow() + + +# Test writes +def test_table_writes_metadata_to_custom_location( + catalog: Catalog, + test_table_identifier: Identifier, + table_schema_simple: Schema, + warehouse: Path, +) -> None: + namespace = Catalog.namespace_from(test_table_identifier) + metadata_path = f"file://{warehouse}/custom/path" + catalog.create_namespace(namespace) + table = catalog.create_table( + identifier=test_table_identifier, + schema=table_schema_simple, + properties={TableProperties.WRITE_METADATA_PATH: metadata_path}, + ) + df = pa.Table.from_pydict( + {"foo": ["a"], "bar": [1], "baz": [True]}, + schema=schema_to_pyarrow(table_schema_simple), + ) + table.append(df) + manifests = table.current_snapshot().manifests(table.io) + location_provider = table.location_provider() + + assert location_provider.new_metadata_location("").startswith(metadata_path) + assert manifests[0].manifest_path.startswith(metadata_path) + assert table.location() != metadata_path + assert table.metadata_location.startswith(metadata_path) + + +def test_table_writes_metadata_to_default_path( + catalog: Catalog, + test_table_identifier: Identifier, + table_schema_simple: Schema, + test_table_properties: dict, +) -> None: + namespace = Catalog.namespace_from(test_table_identifier) + catalog.create_namespace(namespace) + table = catalog.create_table( + identifier=test_table_identifier, + schema=table_schema_simple, + properties=test_table_properties, + ) + metadata_path = f"{table.location()}/metadata" + df = pa.Table.from_pydict( + {"foo": ["a"], "bar": [1], "baz": [True]}, + schema=schema_to_pyarrow(table_schema_simple), + ) + table.append(df) + manifests = table.current_snapshot().manifests(table.io) + location_provider = table.location_provider() + + assert location_provider.new_metadata_location("").startswith(metadata_path) + assert manifests[0].manifest_path.startswith(metadata_path) + assert table.metadata_location.startswith(metadata_path) + + +def test_table_metadata_writes_reflect_latest_path( + catalog: Catalog, + test_table_identifier: Identifier, + table_schema_simple: Schema, + warehouse: Path, +) -> None: + namespace = Catalog.namespace_from(test_table_identifier) + catalog.create_namespace(namespace) + table = catalog.create_table( + identifier=test_table_identifier, + schema=table_schema_simple, + ) + + initial_metadata_path = f"{table.location()}/metadata" + assert table.location_provider().new_metadata_location("metadata.json") == f"{initial_metadata_path}/metadata.json" + + # update table with new path for metadata + new_metadata_path = f"file://{warehouse}/custom/path" + table.transaction().set_properties({TableProperties.WRITE_METADATA_PATH: new_metadata_path}).commit_transaction() + + assert table.location_provider().new_metadata_location("metadata.json") == f"{new_metadata_path}/metadata.json" + + +@pytest.mark.parametrize("format_version", [1, 2]) +def test_write_and_evolve(catalog: Catalog, format_version: int) -> None: + identifier = f"default.arrow_write_data_and_evolve_schema_v{format_version}" + + try: + catalog.create_namespace("default") + except NamespaceAlreadyExistsError: + pass + + try: + catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + + pa_table = pa.Table.from_pydict( + { + "foo": ["a", None, "z"], + }, + schema=pa.schema([pa.field("foo", pa.large_string(), nullable=True)]), + ) + + tbl = catalog.create_table(identifier=identifier, schema=pa_table.schema, properties={"format-version": str(format_version)}) + + pa_table_with_column = pa.Table.from_pydict( + { + "foo": ["a", None, "z"], + "bar": [19, None, 25], + }, + schema=pa.schema( + [ + pa.field("foo", pa.large_string(), nullable=True), + pa.field("bar", pa.int32(), nullable=True), + ] + ), + ) + + with tbl.transaction() as txn: + with txn.update_schema() as schema_txn: + schema_txn.union_by_name(pa_table_with_column.schema) + + txn.append(pa_table_with_column) + txn.overwrite(pa_table_with_column) + txn.delete("foo = 'a'") + + +# Merge manifests +@pytest.mark.parametrize("format_version", [1, 2]) +def test_merge_manifests_local_file_system(catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int) -> None: + # To catch manifest file name collision bug during merge: + # https://github.com/apache/iceberg-python/pull/363#discussion_r1660691918 + catalog.create_namespace_if_not_exists("default") + try: + catalog.drop_table("default.test_merge_manifest") + except NoSuchTableError: + pass + tbl = catalog.create_table( + "default.test_merge_manifest", + arrow_table_with_null.schema, + properties={ + "commit.manifest-merge.enabled": "true", + "commit.manifest.min-count-to-merge": "2", + "format-version": format_version, + }, + ) + + for _ in range(5): + tbl.append(arrow_table_with_null) + + assert len(tbl.scan().to_arrow()) == 5 * len(arrow_table_with_null) + current_snapshot = tbl.current_snapshot() + assert current_snapshot + manifests = current_snapshot.manifests(tbl.io) + assert len(manifests) == 1 + + +# Add column to table + + +def test_add_column(catalog: Catalog, table_schema_simple: Schema, random_table_identifier: Identifier) -> None: + namespace = Catalog.namespace_from(random_table_identifier) + catalog.create_namespace(namespace) + table = catalog.create_table(random_table_identifier, table_schema_simple) + table.update_schema().add_column(path="new_column1", field_type=IntegerType()).commit() + assert table.schema() == Schema( + NestedField(field_id=1, name="foo", field_type=StringType(), required=False), + NestedField(field_id=2, name="bar", field_type=IntegerType(), required=True), + NestedField(field_id=3, name="baz", field_type=BooleanType(), required=False), + NestedField(field_id=4, name="new_column1", field_type=IntegerType(), required=False), + schema_id=1, + identifier_field_ids=[2], + ) + assert table.schema().schema_id == 1 + + transaction = table.transaction() + transaction.update_schema().add_column(path="new_column2", field_type=IntegerType(), doc="doc").commit() + transaction.commit_transaction() + + assert table.schema() == Schema( + NestedField(field_id=1, name="foo", field_type=StringType(), required=False), + NestedField(field_id=2, name="bar", field_type=IntegerType(), required=True), + NestedField(field_id=3, name="baz", field_type=BooleanType(), required=False), + NestedField(field_id=4, name="new_column1", field_type=IntegerType(), required=False), + NestedField(field_id=5, name="new_column2", field_type=IntegerType(), required=False, doc="doc"), + identifier_field_ids=[2], + ) + assert table.schema().schema_id == 2 + + +def test_add_column_with_statement(catalog: Catalog, table_schema_simple: Schema, random_table_identifier: Identifier) -> None: + namespace = Catalog.namespace_from(random_table_identifier) + catalog.create_namespace(namespace) + table = catalog.create_table(random_table_identifier, table_schema_simple) + + with table.update_schema() as tx: + tx.add_column(path="new_column1", field_type=IntegerType()) + + assert table.schema() == Schema( + NestedField(field_id=1, name="foo", field_type=StringType(), required=False), + NestedField(field_id=2, name="bar", field_type=IntegerType(), required=True), + NestedField(field_id=3, name="baz", field_type=BooleanType(), required=False), + NestedField(field_id=4, name="new_column1", field_type=IntegerType(), required=False), + identifier_field_ids=[2], + ) + assert table.schema().schema_id == 1 + + with table.transaction() as tx: + tx.update_schema().add_column(path="new_column2", field_type=IntegerType(), doc="doc").commit() + + assert table.schema() == Schema( + NestedField(field_id=1, name="foo", field_type=StringType(), required=False), + NestedField(field_id=2, name="bar", field_type=IntegerType(), required=True), + NestedField(field_id=3, name="baz", field_type=BooleanType(), required=False), + NestedField(field_id=4, name="new_column1", field_type=IntegerType(), required=False), + NestedField(field_id=5, name="new_column2", field_type=IntegerType(), required=False, doc="doc"), + identifier_field_ids=[2], + ) + assert table.schema().schema_id == 2 + + +# Namespace tests + + +def test_create_namespace(catalog: Catalog, test_namespace: Identifier, test_table_properties: dict) -> None: + catalog.create_namespace(test_namespace, test_table_properties) + assert catalog._namespace_exists(test_namespace) + assert (Catalog.identifier_to_tuple(test_namespace)[:1]) in catalog.list_namespaces() + assert test_table_properties == catalog.load_namespace_properties(test_namespace) + + +def test_create_namespace_raises_error_on_existing_namespace( + catalog: Catalog, test_namespace: Identifier, test_table_properties: dict +) -> None: + catalog.create_namespace(test_namespace, test_table_properties) + with pytest.raises(NamespaceAlreadyExistsError): + catalog.create_namespace(test_namespace, test_table_properties) + + +def test_create_namespace_if_not_exists(catalog: Catalog, database_name: str) -> None: + catalog.create_namespace(database_name) + assert (database_name,) in catalog.list_namespaces() + catalog.create_namespace_if_not_exists(database_name) + assert (database_name,) in catalog.list_namespaces() + + +def test_create_namespaces_sharing_same_prefix(catalog: Catalog, test_namespace: Identifier) -> None: + child_namespace = test_namespace + ("child",) + # Parent first + catalog.create_namespace(test_namespace) + # Then child + catalog.create_namespace(child_namespace) + + +def test_create_namespace_with_comment_and_location(catalog: Catalog, test_namespace: Identifier) -> None: + test_location = "/test/location" + test_properties = { + "comment": "this is a test description", + "location": test_location, + } + catalog.create_namespace(namespace=test_namespace, properties=test_properties) + loaded_database_list = catalog.list_namespaces() + assert Catalog.identifier_to_tuple(test_namespace)[:1] in loaded_database_list + properties = catalog.load_namespace_properties(test_namespace) + assert properties["comment"] == "this is a test description" + assert properties["location"] == test_location + + +@pytest.mark.filterwarnings("ignore") +def test_create_namespace_with_null_properties(catalog: Catalog, test_namespace: Identifier) -> None: + with pytest.raises(IntegrityError): + catalog.create_namespace(namespace=test_namespace, properties={None: "value"}) # type: ignore + + with pytest.raises(IntegrityError): + catalog.create_namespace(namespace=test_namespace, properties={"key": None}) + + +@pytest.mark.parametrize("empty_namespace", ["", (), (""), ("", ""), " ", (" ")]) +def test_create_namespace_with_empty_identifier(catalog: Catalog, empty_namespace: Any) -> None: + with pytest.raises(NoSuchNamespaceError): + catalog.create_namespace(empty_namespace) + + +# Get namespace tests + + +def test_get_namespace_metadata_raises_error_when_namespace_does_not_exist(catalog: Catalog, test_namespace: Identifier) -> None: + namespace = ".".join(test_namespace) + with pytest.raises(NoSuchNamespaceError, match=f"Namespace {namespace} does not exists"): + catalog.load_namespace_properties(test_namespace) + + +def test_namespace_exists(catalog: Catalog) -> None: + for ns in [("db1",), ("db1", "ns1"), ("db2", "ns1"), ("db3", "ns1", "ns2")]: + catalog.create_namespace(ns) + assert catalog._namespace_exists(ns) + + assert catalog._namespace_exists("db2") # `db2` exists because `db2.ns1` exists + assert catalog._namespace_exists("db3.ns1") # `db3.ns1` exists because `db3.ns1.ns2` exists + assert not catalog._namespace_exists("db_") # make sure '_' is escaped in the query + assert not catalog._namespace_exists("db%") # make sure '%' is escaped in the query + + +# Namespace properties + + +def test_load_namespace_properties(catalog: Catalog, test_namespace: Identifier) -> None: + warehouse_location = "/test/location" + test_properties = { + "comment": "this is a test description", + "location": f"{warehouse_location}/{test_namespace}", + "test_property1": "1", + "test_property2": "2", + "test_property3": "3", + } + + catalog.create_namespace(test_namespace, test_properties) + listed_properties = catalog.load_namespace_properties(test_namespace) + for k, v in listed_properties.items(): + assert k in test_properties + assert v == test_properties[k] + + +def test_load_namespace_properties_non_existing_namespace(catalog: Catalog) -> None: + with pytest.raises(NoSuchNamespaceError): + catalog.load_namespace_properties("does_not_exist") + + +def test_load_empty_namespace_properties(catalog: Catalog, test_namespace: Identifier) -> None: + catalog.create_namespace(test_namespace) + listed_properties = catalog.load_namespace_properties(test_namespace) + assert listed_properties == {"exists": "true"} + + +# List namespaces tests + + +def test_list_namespaces(catalog: Catalog) -> None: + namespace_list = ["db", "db.ns1", "db.ns1.ns2", "db.ns2", "db2", "db2.ns1", "db%"] + for namespace in namespace_list: + if not catalog._namespace_exists(namespace): + catalog.create_namespace(namespace) + + ns_list = catalog.list_namespaces() + for ns in [("db",), ("db%",), ("db2",)]: + assert ns in ns_list + + ns_list = catalog.list_namespaces("db") + assert sorted(ns_list) == [("db", "ns1"), ("db", "ns2")] + + ns_list = catalog.list_namespaces("db.ns1") + assert sorted(ns_list) == [("db", "ns1", "ns2")] + + ns_list = catalog.list_namespaces("db.ns1.ns2") + assert len(ns_list) == 0 + + +def test_list_namespaces_fuzzy_match(catalog: Catalog) -> None: + namespace_list = ["db.ns1", "db.ns1.ns2", "db.ns2", "db.ns1X.ns3", "db_.ns1.ns2", "db2.ns1.ns2"] + for namespace in namespace_list: + if not catalog._namespace_exists(namespace): + catalog.create_namespace(namespace) + + assert catalog.list_namespaces("db.ns1") == [("db", "ns1", "ns2")] + + assert catalog.list_namespaces("db_.ns1") == [("db_", "ns1", "ns2")] + + +def test_list_non_existing_namespaces(catalog: Catalog) -> None: + with pytest.raises(NoSuchNamespaceError): + catalog.list_namespaces("does_not_exist") + + +# Update namespace properties tests + + +def test_update_namespace_properties(catalog: Catalog, test_namespace: Identifier) -> None: + warehouse_location = "/test/location" + test_properties = { + "comment": "this is a test description", + "location": f"{warehouse_location}/{test_namespace}", + "test_property1": "1", + "test_property2": "2", + "test_property3": "3", + } + removals = {"test_property1", "test_property2", "test_property3", "should_not_removed"} + updates = {"test_property4": "4", "test_property5": "5", "comment": "updated test description"} + catalog.create_namespace(test_namespace, test_properties) + update_report = catalog.update_namespace_properties(test_namespace, removals, updates) + for k in updates.keys(): + assert k in update_report.updated + for k in removals: + if k == "should_not_removed": + assert k in update_report.missing + else: + assert k in update_report.removed + assert catalog.load_namespace_properties(test_namespace) == { + "comment": "updated test description", + "test_property4": "4", + "test_property5": "5", + "location": f"{warehouse_location}/{test_namespace}", + } + + +def test_update_namespace_metadata_raises_error_when_namespace_does_not_exist( + catalog: Catalog, test_namespace: Identifier, test_table_properties: dict[str, str] +) -> None: + namespace = ".".join(test_namespace) + with pytest.raises(NoSuchNamespaceError, match=f"Namespace {namespace} does not exists"): + catalog.update_namespace_properties(test_namespace, updates=test_table_properties) + + +def test_update_namespace_metadata(catalog: Catalog, test_namespace: Identifier, test_table_properties: dict[str, str]) -> None: + catalog.create_namespace(test_namespace, test_table_properties) + new_metadata = {"key3": "value3", "key4": "value4"} + summary = catalog.update_namespace_properties(test_namespace, updates=new_metadata) + assert catalog._namespace_exists(test_namespace) + assert new_metadata.items() <= catalog.load_namespace_properties(test_namespace).items() + assert summary.removed == [] + assert sorted(summary.updated) == ["key3", "key4"] + assert summary.missing == [] + + +def test_update_namespace_metadata_removals( + catalog: Catalog, test_namespace: Identifier, test_table_properties: dict[str, str] +) -> None: + catalog.create_namespace(test_namespace, test_table_properties) + new_metadata = {"key3": "value3", "key4": "value4"} + remove_metadata = {"key1"} + summary = catalog.update_namespace_properties(test_namespace, remove_metadata, new_metadata) + assert catalog._namespace_exists(test_namespace) + assert new_metadata.items() <= catalog.load_namespace_properties(test_namespace).items() + assert remove_metadata.isdisjoint(catalog.load_namespace_properties(test_namespace).keys()) + assert summary.removed == ["key1"] + assert sorted(summary.updated) == ["key3", "key4"] + assert summary.missing == [] + + +# Drop namespace tests + + +def test_drop_namespace(catalog: Catalog, table_schema_nested: Schema, test_table_identifier: Identifier) -> None: + namespace = Catalog.namespace_from(test_table_identifier) + catalog.create_namespace(namespace) + assert catalog._namespace_exists(namespace) + catalog.create_table(test_table_identifier, table_schema_nested) + with pytest.raises(NamespaceNotEmptyError): + catalog.drop_namespace(namespace) + catalog.drop_table(test_table_identifier) + catalog.drop_namespace(namespace) + assert not catalog._namespace_exists(namespace) + + +def test_drop_namespace_raises_error_when_namespace_does_not_exist(catalog: Catalog) -> None: + with pytest.raises(NoSuchNamespaceError): + catalog.drop_namespace("does_not_exist") + + +def test_drop_namespace_raises_error_when_namespace_not_empty( + catalog: Catalog, table_schema_nested: Schema, test_table_identifier: Identifier +) -> None: + namespace = Catalog.namespace_from(test_table_identifier) + catalog.create_namespace(namespace) + catalog.create_table(test_table_identifier, table_schema_nested) + with pytest.raises(NamespaceNotEmptyError, match=f"Namespace {'.'.join(namespace)} is not empty"): + catalog.drop_namespace(namespace) diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py index 22b9883c6f..f6846195fe 100644 --- a/tests/catalog/test_sql.py +++ b/tests/catalog/test_sql.py @@ -15,22 +15,15 @@ # specific language governing permissions and limitations # under the License. -import os from collections.abc import Generator from pathlib import Path -from typing import Any, cast +from typing import cast -import pyarrow as pa import pytest -from pydantic_core import ValidationError -from pytest_lazyfixture import lazy_fixture from sqlalchemy import Engine, create_engine, inspect -from sqlalchemy.exc import ArgumentError, IntegrityError +from sqlalchemy.exc import ArgumentError -from pyiceberg.catalog import ( - Catalog, - load_catalog, -) +from pyiceberg.catalog import load_catalog from pyiceberg.catalog.sql import ( DEFAULT_ECHO_VALUE, DEFAULT_POOL_PRE_PING_VALUE, @@ -39,29 +32,11 @@ SqlCatalogBaseTable, ) from pyiceberg.exceptions import ( - CommitFailedException, - NamespaceAlreadyExistsError, - NamespaceNotEmptyError, - NoSuchNamespaceError, NoSuchPropertyException, - NoSuchTableError, TableAlreadyExistsError, ) -from pyiceberg.io import FSSPEC_FILE_IO, PY_IO_IMPL -from pyiceberg.io.pyarrow import _dataframe_to_data_files, schema_to_pyarrow -from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC from pyiceberg.schema import Schema -from pyiceberg.table import TableProperties -from pyiceberg.table.snapshots import Operation -from pyiceberg.table.sorting import ( - NullOrder, - SortDirection, - SortField, - SortOrder, -) -from pyiceberg.transforms import IdentityTransform -from pyiceberg.typedef import Identifier -from pyiceberg.types import IntegerType, NestedField, StringType, strtobool +from pyiceberg.types import NestedField, StringType, strtobool CATALOG_TABLES = [c.__tablename__ for c in SqlCatalogBaseTable.__subclasses__()] @@ -71,36 +46,6 @@ def catalog_name() -> str: return "test_sql_catalog" -@pytest.fixture(name="random_table_identifier") -def fixture_random_table_identifier(warehouse: Path, database_name: str, table_name: str) -> Identifier: - os.makedirs(f"{warehouse}/{database_name}/{table_name}/metadata/", exist_ok=True) - return database_name, table_name - - -@pytest.fixture(name="another_random_table_identifier") -def fixture_another_random_table_identifier(warehouse: Path, database_name: str, table_name: str) -> Identifier: - database_name = database_name + "_new" - table_name = table_name + "_new" - os.makedirs(f"{warehouse}/{database_name}/{table_name}/metadata/", exist_ok=True) - return database_name, table_name - - -@pytest.fixture(name="random_hierarchical_identifier") -def fixture_random_hierarchical_identifier(warehouse: Path, hierarchical_namespace_name: str, table_name: str) -> Identifier: - os.makedirs(f"{warehouse}/{hierarchical_namespace_name}/{table_name}/metadata/", exist_ok=True) - return Catalog.identifier_to_tuple(".".join((hierarchical_namespace_name, table_name))) - - -@pytest.fixture(name="another_random_hierarchical_identifier") -def fixture_another_random_hierarchical_identifier( - warehouse: Path, hierarchical_namespace_name: str, table_name: str -) -> Identifier: - hierarchical_namespace_name = hierarchical_namespace_name + "_new" - table_name = table_name + "_new" - os.makedirs(f"{warehouse}/{hierarchical_namespace_name}/{table_name}/metadata/", exist_ok=True) - return Catalog.identifier_to_tuple(".".join((hierarchical_namespace_name, table_name))) - - @pytest.fixture(scope="module") def catalog_memory(catalog_name: str, warehouse: Path) -> Generator[SqlCatalog, None, None]: props = { @@ -135,32 +80,6 @@ def alchemy_engine(catalog_uri: str) -> Engine: return create_engine(catalog_uri) -@pytest.fixture(scope="module") -def catalog_sqlite_without_rowcount(catalog_name: str, warehouse: Path) -> Generator[SqlCatalog, None, None]: - props = { - "uri": f"sqlite:////{warehouse}/sql-catalog", - "warehouse": f"file://{warehouse}", - } - catalog = SqlCatalog(catalog_name, **props) - catalog.engine.dialect.supports_sane_rowcount = False - catalog.create_tables() - yield catalog - catalog.destroy_tables() - - -@pytest.fixture(scope="module") -def catalog_sqlite_fsspec(catalog_name: str, warehouse: Path) -> Generator[SqlCatalog, None, None]: - props = { - "uri": f"sqlite:////{warehouse}/sql-catalog", - "warehouse": f"file://{warehouse}", - PY_IO_IMPL: FSSPEC_FILE_IO, - } - catalog = SqlCatalog(catalog_name, **props) - catalog.create_tables() - yield catalog - catalog.destroy_tables() - - def test_creation_with_no_uri(catalog_name: str) -> None: with pytest.raises(NoSuchPropertyException): SqlCatalog(catalog_name, not_uri="unused") @@ -291,1426 +210,6 @@ def test_creation_when_all_tables_exists(alchemy_engine: Engine, catalog_name: s confirm_all_tables_exist(catalog) -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - ], -) -def test_create_tables_idempotency(catalog: SqlCatalog) -> None: - # Second initialization should not fail even if tables are already created - catalog.create_tables() - catalog.create_tables() - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - ], -) -@pytest.mark.parametrize( - "table_identifier", - [ - lazy_fixture("random_table_identifier"), - lazy_fixture("random_hierarchical_identifier"), - ], -) -def test_create_table_default_sort_order(catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier) -> None: - namespace = Catalog.namespace_from(table_identifier) - catalog.create_namespace(namespace) - table = catalog.create_table(table_identifier, table_schema_nested) - assert table.sort_order().order_id == 0, "Order ID must match" - assert table.sort_order().is_unsorted is True, "Order must be unsorted" - catalog.drop_table(table_identifier) - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - ], -) -@pytest.mark.parametrize( - "table_identifier", - [ - lazy_fixture("random_table_identifier"), - lazy_fixture("random_hierarchical_identifier"), - ], -) -def test_create_v1_table(catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier) -> None: - namespace = Catalog.namespace_from(table_identifier) - catalog.create_namespace(namespace) - table = catalog.create_table(table_identifier, table_schema_nested, properties={"format-version": "1"}) - assert table.sort_order().order_id == 0, "Order ID must match" - assert table.sort_order().is_unsorted is True, "Order must be unsorted" - assert table.format_version == 1 - assert table.spec() == UNPARTITIONED_PARTITION_SPEC - catalog.drop_table(table_identifier) - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - ], -) -@pytest.mark.parametrize( - "table_identifier", - [ - lazy_fixture("random_table_identifier"), - lazy_fixture("random_hierarchical_identifier"), - ], -) -def test_create_table_with_pyarrow_schema( - catalog: SqlCatalog, - pyarrow_schema_simple_without_ids: pa.Schema, - iceberg_table_schema_simple: Schema, - table_identifier: Identifier, -) -> None: - namespace = Catalog.namespace_from(table_identifier) - catalog.create_namespace(namespace) - table = catalog.create_table(table_identifier, pyarrow_schema_simple_without_ids) - assert table.schema() == iceberg_table_schema_simple - catalog.drop_table(table_identifier) - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - ], -) -@pytest.mark.parametrize( - "table_identifier", - [ - lazy_fixture("random_table_identifier"), - lazy_fixture("random_hierarchical_identifier"), - ], -) -def test_write_pyarrow_schema(catalog: SqlCatalog, table_identifier: Identifier) -> None: - import pyarrow as pa - - pyarrow_table = pa.Table.from_arrays( - [ - pa.array([None, "A", "B", "C"]), # 'foo' column - pa.array([1, 2, 3, 4]), # 'bar' column - pa.array([True, None, False, True]), # 'baz' column - pa.array([None, "A", "B", "C"]), # 'large' column - ], - schema=pa.schema( - [ - pa.field("foo", pa.large_string(), nullable=True), - pa.field("bar", pa.int32(), nullable=False), - pa.field("baz", pa.bool_(), nullable=True), - pa.field("large", pa.large_string(), nullable=True), - ] - ), - ) - namespace = Catalog.namespace_from(table_identifier) - catalog.create_namespace(namespace) - table = catalog.create_table(table_identifier, pyarrow_table.schema) - table.append(pyarrow_table) - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - ], -) -@pytest.mark.parametrize( - "table_identifier", - [ - lazy_fixture("random_table_identifier"), - lazy_fixture("random_hierarchical_identifier"), - ], -) -def test_create_table_custom_sort_order(catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier) -> None: - namespace = Catalog.namespace_from(table_identifier) - catalog.create_namespace(namespace) - order = SortOrder(SortField(source_id=2, transform=IdentityTransform(), null_order=NullOrder.NULLS_FIRST)) - table = catalog.create_table(table_identifier, table_schema_nested, sort_order=order) - given_sort_order = table.sort_order() - assert given_sort_order.order_id == 1, "Order ID must match" - assert len(given_sort_order.fields) == 1, "Order must have 1 field" - assert given_sort_order.fields[0].direction == SortDirection.ASC, "Direction must match" - assert given_sort_order.fields[0].null_order == NullOrder.NULLS_FIRST, "Null order must match" - assert isinstance(given_sort_order.fields[0].transform, IdentityTransform), "Transform must match" - catalog.drop_table(table_identifier) - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - ], -) -@pytest.mark.parametrize( - "table_identifier", - [ - lazy_fixture("random_table_identifier"), - lazy_fixture("random_hierarchical_identifier"), - ], -) -def test_create_table_with_default_warehouse_location( - warehouse: Path, catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier -) -> None: - identifier_tuple = Catalog.identifier_to_tuple(table_identifier) - namespace = Catalog.namespace_from(table_identifier) - catalog.create_namespace(namespace) - catalog.create_table(table_identifier, table_schema_nested) - table = catalog.load_table(table_identifier) - assert table.name() == identifier_tuple - assert table.metadata_location.startswith(f"file://{warehouse}") - assert os.path.exists(table.metadata_location[len("file://") :]) - catalog.drop_table(table_identifier) - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - ], -) -@pytest.mark.parametrize( - "table_identifier", - [ - lazy_fixture("random_table_identifier"), - lazy_fixture("random_hierarchical_identifier"), - ], -) -def test_create_table_with_given_location_removes_trailing_slash( - warehouse: Path, catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier -) -> None: - identifier_tuple = Catalog.identifier_to_tuple(table_identifier) - namespace = Catalog.namespace_from(table_identifier) - table_name = Catalog.table_name_from(identifier_tuple) - location = f"file://{warehouse}/{catalog.name}/{table_name}-given" - catalog.create_namespace(namespace) - catalog.create_table(table_identifier, table_schema_nested, location=f"{location}/") - table = catalog.load_table(table_identifier) - assert table.name() == identifier_tuple - assert table.metadata_location.startswith(f"file://{warehouse}") - assert os.path.exists(table.metadata_location[len("file://") :]) - assert table.location() == location - catalog.drop_table(table_identifier) - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - ], -) -@pytest.mark.parametrize( - "table_identifier", - [ - lazy_fixture("random_table_identifier"), - lazy_fixture("random_hierarchical_identifier"), - ], -) -def test_create_duplicated_table(catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier) -> None: - namespace = Catalog.namespace_from(table_identifier) - catalog.create_namespace(namespace) - catalog.create_table(table_identifier, table_schema_nested) - with pytest.raises(TableAlreadyExistsError): - catalog.create_table(table_identifier, table_schema_nested) - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - ], -) -@pytest.mark.parametrize( - "table_identifier", - [ - lazy_fixture("random_table_identifier"), - lazy_fixture("random_hierarchical_identifier"), - ], -) -def test_create_table_if_not_exists_duplicated_table( - catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier -) -> None: - namespace = Catalog.namespace_from(table_identifier) - catalog.create_namespace(namespace) - table1 = catalog.create_table(table_identifier, table_schema_nested) - table2 = catalog.create_table_if_not_exists(table_identifier, table_schema_nested) - assert table1.name() == table2.name() - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - ], -) -def test_create_table_with_non_existing_namespace(catalog: SqlCatalog, table_schema_nested: Schema, table_name: str) -> None: - identifier = ("invalid", table_name) - with pytest.raises(NoSuchNamespaceError): - catalog.create_table(identifier, table_schema_nested) - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - ], -) -def test_create_table_without_namespace(catalog: SqlCatalog, table_schema_nested: Schema, table_name: str) -> None: - with pytest.raises(NoSuchNamespaceError): - catalog.create_table(table_name, table_schema_nested) - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - ], -) -@pytest.mark.parametrize( - "table_identifier", - [ - lazy_fixture("random_table_identifier"), - lazy_fixture("random_hierarchical_identifier"), - ], -) -def test_register_table(catalog: SqlCatalog, table_identifier: Identifier, metadata_location: str) -> None: - identifier_tuple = Catalog.identifier_to_tuple(table_identifier) - namespace = Catalog.namespace_from(table_identifier) - catalog.create_namespace(namespace) - table = catalog.register_table(table_identifier, metadata_location) - assert table.name() == identifier_tuple - assert table.metadata_location == metadata_location - assert os.path.exists(metadata_location) - catalog.drop_table(table_identifier) - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - ], -) -@pytest.mark.parametrize( - "table_identifier", - [ - lazy_fixture("random_table_identifier"), - lazy_fixture("random_hierarchical_identifier"), - ], -) -def test_register_existing_table(catalog: SqlCatalog, table_identifier: Identifier, metadata_location: str) -> None: - namespace = Catalog.namespace_from(table_identifier) - catalog.create_namespace(namespace) - catalog.register_table(table_identifier, metadata_location) - with pytest.raises(TableAlreadyExistsError): - catalog.register_table(table_identifier, metadata_location) - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - ], -) -def test_register_table_with_non_existing_namespace(catalog: SqlCatalog, metadata_location: str, table_name: str) -> None: - identifier = ("invalid", table_name) - with pytest.raises(NoSuchNamespaceError): - catalog.register_table(identifier, metadata_location) - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - ], -) -def test_register_table_without_namespace(catalog: SqlCatalog, metadata_location: str, table_name: str) -> None: - with pytest.raises(ValueError): - catalog.register_table(table_name, metadata_location) - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - ], -) -@pytest.mark.parametrize( - "table_identifier", - [ - lazy_fixture("random_table_identifier"), - lazy_fixture("random_hierarchical_identifier"), - ], -) -def test_load_table(catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier) -> None: - namespace = Catalog.namespace_from(table_identifier) - catalog.create_namespace(namespace) - table = catalog.create_table(table_identifier, table_schema_nested) - loaded_table = catalog.load_table(table_identifier) - assert table.name() == loaded_table.name() - assert table.metadata_location == loaded_table.metadata_location - assert table.metadata == loaded_table.metadata - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - ], -) -@pytest.mark.parametrize( - "table_identifier", - [ - lazy_fixture("random_table_identifier"), - lazy_fixture("random_hierarchical_identifier"), - ], -) -def test_load_table_from_self_identifier(catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier) -> None: - identifier_tuple = Catalog.identifier_to_tuple(table_identifier) - namespace = Catalog.namespace_from(table_identifier) - catalog.create_namespace(namespace) - table = catalog.create_table(table_identifier, table_schema_nested) - intermediate = catalog.load_table(table_identifier) - assert intermediate.name() == identifier_tuple - loaded_table = catalog.load_table(intermediate.name()) - assert table.name() == loaded_table.name() - assert table.metadata_location == loaded_table.metadata_location - assert table.metadata == loaded_table.metadata - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - lazy_fixture("catalog_sqlite_without_rowcount"), - ], -) -@pytest.mark.parametrize( - "table_identifier", - [ - lazy_fixture("random_table_identifier"), - lazy_fixture("random_hierarchical_identifier"), - ], -) -def test_drop_table(catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier) -> None: - identifier_tuple = Catalog.identifier_to_tuple(table_identifier) - namespace = Catalog.namespace_from(table_identifier) - catalog.create_namespace(namespace) - table = catalog.create_table(table_identifier, table_schema_nested) - assert table.name() == identifier_tuple - catalog.drop_table(table_identifier) - with pytest.raises(NoSuchTableError): - catalog.load_table(table_identifier) - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - lazy_fixture("catalog_sqlite_without_rowcount"), - ], -) -@pytest.mark.parametrize( - "table_identifier", - [ - lazy_fixture("random_table_identifier"), - lazy_fixture("random_hierarchical_identifier"), - ], -) -def test_drop_table_from_self_identifier(catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier) -> None: - identifier_tuple = Catalog.identifier_to_tuple(table_identifier) - namespace = Catalog.namespace_from(table_identifier) - catalog.create_namespace(namespace) - table = catalog.create_table(table_identifier, table_schema_nested) - assert table.name() == identifier_tuple - catalog.drop_table(table.name()) - with pytest.raises(NoSuchTableError): - catalog.load_table(table.name()) - with pytest.raises(NoSuchTableError): - catalog.load_table(table_identifier) - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - lazy_fixture("catalog_sqlite_without_rowcount"), - ], -) -@pytest.mark.parametrize( - "table_identifier", - [ - lazy_fixture("random_table_identifier"), - lazy_fixture("random_hierarchical_identifier"), - ], -) -def test_drop_table_that_does_not_exist(catalog: SqlCatalog, table_identifier: Identifier) -> None: - with pytest.raises(NoSuchTableError): - catalog.drop_table(table_identifier) - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - lazy_fixture("catalog_sqlite_without_rowcount"), - ], -) -@pytest.mark.parametrize( - "from_table_identifier", - [ - lazy_fixture("random_table_identifier"), - lazy_fixture("random_hierarchical_identifier"), - ], -) -@pytest.mark.parametrize( - "to_table_identifier", - [ - lazy_fixture("another_random_table_identifier"), - lazy_fixture("another_random_hierarchical_identifier"), - ], -) -def test_rename_table( - catalog: SqlCatalog, table_schema_nested: Schema, from_table_identifier: Identifier, to_table_identifier: Identifier -) -> None: - from_namespace = Catalog.namespace_from(from_table_identifier) - to_namespace = Catalog.namespace_from(to_table_identifier) - catalog.create_namespace(from_namespace) - catalog.create_namespace(to_namespace) - table = catalog.create_table(from_table_identifier, table_schema_nested) - assert table.name() == from_table_identifier - catalog.rename_table(from_table_identifier, to_table_identifier) - new_table = catalog.load_table(to_table_identifier) - assert new_table.name() == to_table_identifier - assert new_table.metadata_location == table.metadata_location - with pytest.raises(NoSuchTableError): - catalog.load_table(from_table_identifier) - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - lazy_fixture("catalog_sqlite_without_rowcount"), - ], -) -@pytest.mark.parametrize( - "from_table_identifier", - [ - lazy_fixture("random_table_identifier"), - lazy_fixture("random_hierarchical_identifier"), - ], -) -@pytest.mark.parametrize( - "to_table_identifier", - [ - lazy_fixture("another_random_table_identifier"), - lazy_fixture("another_random_hierarchical_identifier"), - ], -) -def test_rename_table_from_self_identifier( - catalog: SqlCatalog, table_schema_nested: Schema, from_table_identifier: Identifier, to_table_identifier: Identifier -) -> None: - from_namespace = Catalog.namespace_from(from_table_identifier) - to_namespace = Catalog.namespace_from(to_table_identifier) - catalog.create_namespace(from_namespace) - catalog.create_namespace(to_namespace) - table = catalog.create_table(from_table_identifier, table_schema_nested) - assert table.name() == from_table_identifier - catalog.rename_table(table.name(), to_table_identifier) - new_table = catalog.load_table(to_table_identifier) - assert new_table.name() == to_table_identifier - assert new_table.metadata_location == table.metadata_location - with pytest.raises(NoSuchTableError): - catalog.load_table(table.name()) - with pytest.raises(NoSuchTableError): - catalog.load_table(from_table_identifier) - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - lazy_fixture("catalog_sqlite_without_rowcount"), - ], -) -@pytest.mark.parametrize( - "from_table_identifier", - [ - lazy_fixture("random_table_identifier"), - lazy_fixture("random_hierarchical_identifier"), - ], -) -@pytest.mark.parametrize( - "to_table_identifier", - [ - lazy_fixture("another_random_table_identifier"), - lazy_fixture("another_random_hierarchical_identifier"), - ], -) -def test_rename_table_to_existing_one( - catalog: SqlCatalog, table_schema_nested: Schema, from_table_identifier: Identifier, to_table_identifier: Identifier -) -> None: - from_namespace = Catalog.namespace_from(from_table_identifier) - to_namespace = Catalog.namespace_from(to_table_identifier) - catalog.create_namespace(from_namespace) - catalog.create_namespace(to_namespace) - table = catalog.create_table(from_table_identifier, table_schema_nested) - assert table.name() == from_table_identifier - new_table = catalog.create_table(to_table_identifier, table_schema_nested) - assert new_table.name() == to_table_identifier - with pytest.raises(TableAlreadyExistsError): - catalog.rename_table(from_table_identifier, to_table_identifier) - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - lazy_fixture("catalog_sqlite_without_rowcount"), - ], -) -@pytest.mark.parametrize( - "from_table_identifier", - [ - lazy_fixture("random_table_identifier"), - lazy_fixture("random_hierarchical_identifier"), - ], -) -@pytest.mark.parametrize( - "to_table_identifier", - [ - lazy_fixture("another_random_table_identifier"), - lazy_fixture("another_random_hierarchical_identifier"), - ], -) -def test_rename_missing_table(catalog: SqlCatalog, from_table_identifier: Identifier, to_table_identifier: Identifier) -> None: - to_namespace = Catalog.namespace_from(to_table_identifier) - catalog.create_namespace(to_namespace) - with pytest.raises(NoSuchTableError): - catalog.rename_table(from_table_identifier, to_table_identifier) - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - lazy_fixture("catalog_sqlite_without_rowcount"), - ], -) -@pytest.mark.parametrize( - "from_table_identifier", - [ - lazy_fixture("random_table_identifier"), - lazy_fixture("random_hierarchical_identifier"), - ], -) -@pytest.mark.parametrize( - "to_table_identifier", - [ - lazy_fixture("another_random_table_identifier"), - lazy_fixture("another_random_hierarchical_identifier"), - ], -) -def test_rename_table_to_missing_namespace( - catalog: SqlCatalog, table_schema_nested: Schema, from_table_identifier: Identifier, to_table_identifier: Identifier -) -> None: - from_namespace = Catalog.namespace_from(from_table_identifier) - catalog.create_namespace(from_namespace) - table = catalog.create_table(from_table_identifier, table_schema_nested) - assert table.name() == from_table_identifier - with pytest.raises(NoSuchNamespaceError): - catalog.rename_table(from_table_identifier, to_table_identifier) - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - ], -) -@pytest.mark.parametrize( - "table_identifier_1", - [ - lazy_fixture("random_table_identifier"), - lazy_fixture("random_hierarchical_identifier"), - ], -) -@pytest.mark.parametrize( - "table_identifier_2", - [ - lazy_fixture("another_random_table_identifier"), - lazy_fixture("another_random_hierarchical_identifier"), - ], -) -def test_list_tables( - catalog: SqlCatalog, table_schema_nested: Schema, table_identifier_1: Identifier, table_identifier_2: Identifier -) -> None: - namespace_1 = Catalog.namespace_from(table_identifier_1) - namespace_2 = Catalog.namespace_from(table_identifier_2) - catalog.create_namespace(namespace_1) - catalog.create_namespace(namespace_2) - catalog.create_table(table_identifier_1, table_schema_nested) - catalog.create_table(table_identifier_2, table_schema_nested) - identifier_list = catalog.list_tables(namespace_1) - assert len(identifier_list) == 1 - assert table_identifier_1 in identifier_list - - identifier_list = catalog.list_tables(namespace_2) - assert len(identifier_list) == 1 - assert table_identifier_2 in identifier_list - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - ], -) -@pytest.mark.parametrize("namespace", [lazy_fixture("database_name"), lazy_fixture("hierarchical_namespace_name")]) -def test_list_tables_when_missing_namespace(catalog: SqlCatalog, namespace: str) -> None: - with pytest.raises(NoSuchNamespaceError): - catalog.list_tables(namespace) - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - ], -) -def test_create_namespace_if_not_exists(catalog: SqlCatalog, database_name: str) -> None: - catalog.create_namespace(database_name) - assert (database_name,) in catalog.list_namespaces() - catalog.create_namespace_if_not_exists(database_name) - assert (database_name,) in catalog.list_namespaces() - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - ], -) -@pytest.mark.parametrize("namespace", [lazy_fixture("database_name"), lazy_fixture("hierarchical_namespace_name")]) -def test_create_namespace(catalog: SqlCatalog, namespace: str) -> None: - catalog.create_namespace(namespace) - assert (Catalog.identifier_to_tuple(namespace)[:1]) in catalog.list_namespaces() - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - ], -) -@pytest.mark.parametrize("namespace", [lazy_fixture("database_name"), lazy_fixture("hierarchical_namespace_name")]) -def test_create_duplicate_namespace(catalog: SqlCatalog, namespace: str) -> None: - catalog.create_namespace(namespace) - with pytest.raises(NamespaceAlreadyExistsError): - catalog.create_namespace(namespace) - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - ], -) -@pytest.mark.parametrize("namespace", [lazy_fixture("database_name"), lazy_fixture("hierarchical_namespace_name")]) -def test_create_namespaces_sharing_same_prefix(catalog: SqlCatalog, namespace: str) -> None: - catalog.create_namespace(namespace + "_1") - # Second namespace is a prefix of the first one, make sure it can be added. - catalog.create_namespace(namespace) - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - ], -) -@pytest.mark.parametrize("namespace", [lazy_fixture("database_name"), lazy_fixture("hierarchical_namespace_name")]) -def test_create_namespace_with_comment_and_location(catalog: SqlCatalog, namespace: str) -> None: - test_location = "/test/location" - test_properties = { - "comment": "this is a test description", - "location": test_location, - } - catalog.create_namespace(namespace=namespace, properties=test_properties) - loaded_database_list = catalog.list_namespaces() - assert Catalog.identifier_to_tuple(namespace)[:1] in loaded_database_list - properties = catalog.load_namespace_properties(namespace) - assert properties["comment"] == "this is a test description" - assert properties["location"] == test_location - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - ], -) -@pytest.mark.parametrize("namespace", [lazy_fixture("database_name"), lazy_fixture("hierarchical_namespace_name")]) -@pytest.mark.filterwarnings("ignore") -def test_create_namespace_with_null_properties(catalog: SqlCatalog, namespace: str) -> None: - with pytest.raises(IntegrityError): - catalog.create_namespace(namespace=namespace, properties={None: "value"}) # type: ignore - - with pytest.raises(IntegrityError): - catalog.create_namespace(namespace=namespace, properties={"key": None}) - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - ], -) -@pytest.mark.parametrize("empty_namespace", ["", (), (""), ("", ""), " ", (" ")]) -def test_create_namespace_with_empty_identifier(catalog: SqlCatalog, empty_namespace: Any) -> None: - with pytest.raises(NoSuchNamespaceError): - catalog.create_namespace(empty_namespace) - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - ], -) -def test_namespace_exists(catalog: SqlCatalog) -> None: - for ns in [("db1",), ("db1", "ns1"), ("db2", "ns1"), ("db3", "ns1", "ns2")]: - catalog.create_namespace(ns) - assert catalog._namespace_exists(ns) - - assert catalog._namespace_exists("db2") # `db2` exists because `db2.ns1` exists - assert catalog._namespace_exists("db3.ns1") # `db3.ns1` exists because `db3.ns1.ns2` exists - assert not catalog._namespace_exists("db_") # make sure '_' is escaped in the query - assert not catalog._namespace_exists("db%") # make sure '%' is escaped in the query - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - ], -) -def test_list_namespaces(catalog: SqlCatalog) -> None: - namespace_list = ["db", "db.ns1", "db.ns1.ns2", "db.ns2", "db2", "db2.ns1", "db%"] - for namespace in namespace_list: - if not catalog._namespace_exists(namespace): - catalog.create_namespace(namespace) - - ns_list = catalog.list_namespaces() - for ns in [("db",), ("db%",), ("db2",)]: - assert ns in ns_list - - ns_list = catalog.list_namespaces("db") - assert sorted(ns_list) == [("db", "ns1"), ("db", "ns2")] - - ns_list = catalog.list_namespaces("db.ns1") - assert sorted(ns_list) == [("db", "ns1", "ns2")] - - ns_list = catalog.list_namespaces("db.ns1.ns2") - assert len(ns_list) == 0 - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - ], -) -def test_list_namespaces_fuzzy_match(catalog: SqlCatalog) -> None: - namespace_list = ["db.ns1", "db.ns1.ns2", "db.ns2", "db.ns1X.ns3", "db_.ns1.ns2", "db2.ns1.ns2"] - for namespace in namespace_list: - if not catalog._namespace_exists(namespace): - catalog.create_namespace(namespace) - - assert catalog.list_namespaces("db.ns1") == [("db", "ns1", "ns2")] - - assert catalog.list_namespaces("db_.ns1") == [("db_", "ns1", "ns2")] - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - ], -) -def test_list_non_existing_namespaces(catalog: SqlCatalog) -> None: - with pytest.raises(NoSuchNamespaceError): - catalog.list_namespaces("does_not_exist") - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - ], -) -@pytest.mark.parametrize( - "table_identifier", - [ - lazy_fixture("random_table_identifier"), - lazy_fixture("random_hierarchical_identifier"), - ], -) -def test_drop_namespace(catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier) -> None: - namespace = Catalog.namespace_from(table_identifier) - catalog.create_namespace(namespace) - assert catalog._namespace_exists(namespace) - catalog.create_table(table_identifier, table_schema_nested) - with pytest.raises(NamespaceNotEmptyError): - catalog.drop_namespace(namespace) - catalog.drop_table(table_identifier) - catalog.drop_namespace(namespace) - assert not catalog._namespace_exists(namespace) - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - ], -) -def test_drop_non_existing_namespaces(catalog: SqlCatalog) -> None: - with pytest.raises(NoSuchNamespaceError): - catalog.drop_namespace("does_not_exist") - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - ], -) -@pytest.mark.parametrize("namespace", [lazy_fixture("database_name"), lazy_fixture("hierarchical_namespace_name")]) -def test_load_namespace_properties(catalog: SqlCatalog, namespace: str) -> None: - warehouse_location = "/test/location" - test_properties = { - "comment": "this is a test description", - "location": f"{warehouse_location}/{namespace}", - "test_property1": "1", - "test_property2": "2", - "test_property3": "3", - } - - catalog.create_namespace(namespace, test_properties) - listed_properties = catalog.load_namespace_properties(namespace) - for k, v in listed_properties.items(): - assert k in test_properties - assert v == test_properties[k] - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - ], -) -@pytest.mark.parametrize("namespace", [lazy_fixture("database_name"), lazy_fixture("hierarchical_namespace_name")]) -def test_load_empty_namespace_properties(catalog: SqlCatalog, namespace: str) -> None: - catalog.create_namespace(namespace) - listed_properties = catalog.load_namespace_properties(namespace) - assert listed_properties == {"exists": "true"} - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - ], -) -def test_load_namespace_properties_non_existing_namespace(catalog: SqlCatalog) -> None: - with pytest.raises(NoSuchNamespaceError): - catalog.load_namespace_properties("does_not_exist") - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - ], -) -@pytest.mark.parametrize("namespace", [lazy_fixture("database_name"), lazy_fixture("hierarchical_namespace_name")]) -def test_update_namespace_properties(catalog: SqlCatalog, namespace: str) -> None: - warehouse_location = "/test/location" - test_properties = { - "comment": "this is a test description", - "location": f"{warehouse_location}/{namespace}", - "test_property1": "1", - "test_property2": "2", - "test_property3": "3", - } - removals = {"test_property1", "test_property2", "test_property3", "should_not_removed"} - updates = {"test_property4": "4", "test_property5": "5", "comment": "updated test description"} - catalog.create_namespace(namespace, test_properties) - update_report = catalog.update_namespace_properties(namespace, removals, updates) - for k in updates.keys(): - assert k in update_report.updated - for k in removals: - if k == "should_not_removed": - assert k in update_report.missing - else: - assert k in update_report.removed - assert catalog.load_namespace_properties(namespace) == { - "comment": "updated test description", - "test_property4": "4", - "test_property5": "5", - "location": f"{warehouse_location}/{namespace}", - } - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - lazy_fixture("catalog_sqlite_without_rowcount"), - ], -) -@pytest.mark.parametrize( - "table_identifier", - [ - lazy_fixture("random_table_identifier"), - lazy_fixture("random_hierarchical_identifier"), - ], -) -def test_commit_table(catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier) -> None: - namespace = Catalog.namespace_from(table_identifier) - catalog.create_namespace(namespace) - table = catalog.create_table(table_identifier, table_schema_nested) - last_updated_ms = table.metadata.last_updated_ms - original_table_metadata_location = table.metadata_location - original_table_last_updated_ms = table.metadata.last_updated_ms - - assert catalog._parse_metadata_version(table.metadata_location) == 0 - assert table.metadata.current_schema_id == 0 - - transaction = table.transaction() - update = transaction.update_schema() - update.add_column(path="b", field_type=IntegerType()) - update.commit() - transaction.commit_transaction() - - updated_table_metadata = table.metadata - - assert catalog._parse_metadata_version(table.metadata_location) == 1 - assert updated_table_metadata.current_schema_id == 1 - assert len(updated_table_metadata.schemas) == 2 - new_schema = next(schema for schema in updated_table_metadata.schemas if schema.schema_id == 1) - assert new_schema - assert new_schema == update._apply() - assert new_schema.find_field("b").field_type == IntegerType() - assert updated_table_metadata.last_updated_ms > last_updated_ms - assert len(updated_table_metadata.metadata_log) == 1 - assert updated_table_metadata.metadata_log[0].metadata_file == original_table_metadata_location - assert updated_table_metadata.metadata_log[0].timestamp_ms == original_table_last_updated_ms - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - lazy_fixture("catalog_sqlite_without_rowcount"), - lazy_fixture("catalog_sqlite_fsspec"), - ], -) -@pytest.mark.parametrize( - "table_identifier", - [ - lazy_fixture("random_table_identifier"), - lazy_fixture("random_hierarchical_identifier"), - ], -) -def test_append_table(catalog: SqlCatalog, table_schema_simple: Schema, table_identifier: Identifier) -> None: - namespace = Catalog.namespace_from(table_identifier) - catalog.create_namespace(namespace) - table = catalog.create_table(table_identifier, table_schema_simple) - - df = pa.Table.from_pydict( - { - "foo": ["a"], - "bar": [1], - "baz": [True], - }, - schema=schema_to_pyarrow(table_schema_simple), - ) - - table.append(df) - - # new snapshot is written in APPEND mode - assert len(table.metadata.snapshots) == 1 - assert table.metadata.snapshots[0].snapshot_id == table.metadata.current_snapshot_id - assert table.metadata.snapshots[0].parent_snapshot_id is None - assert table.metadata.snapshots[0].sequence_number == 1 - assert table.metadata.snapshots[0].summary is not None - assert table.metadata.snapshots[0].summary.operation == Operation.APPEND - assert table.metadata.snapshots[0].summary["added-data-files"] == "1" - assert table.metadata.snapshots[0].summary["added-records"] == "1" - assert table.metadata.snapshots[0].summary["total-data-files"] == "1" - assert table.metadata.snapshots[0].summary["total-records"] == "1" - assert len(table.metadata.metadata_log) == 1 - - # read back the data - assert df == table.scan().to_arrow() - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - lazy_fixture("catalog_sqlite_without_rowcount"), - ], -) -@pytest.mark.parametrize( - "table_identifier", - [ - lazy_fixture("random_table_identifier"), - lazy_fixture("random_hierarchical_identifier"), - ], -) -def test_concurrent_commit_table(catalog: SqlCatalog, table_schema_simple: Schema, table_identifier: Identifier) -> None: - namespace = Catalog.namespace_from(table_identifier) - catalog.create_namespace(namespace) - table_a = catalog.create_table(table_identifier, table_schema_simple) - table_b = catalog.load_table(table_identifier) - - with table_a.update_schema() as update: - update.add_column(path="b", field_type=IntegerType()) - - with pytest.raises(CommitFailedException, match="Requirement failed: current schema id has changed: expected 0, found 1"): - # This one should fail since it already has been updated - with table_b.update_schema() as update: - update.add_column(path="c", field_type=IntegerType()) - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - lazy_fixture("catalog_sqlite_without_rowcount"), - ], -) -@pytest.mark.parametrize("format_version", [1, 2]) -def test_write_and_evolve(catalog: SqlCatalog, format_version: int) -> None: - identifier = f"default.arrow_write_data_and_evolve_schema_v{format_version}" - - try: - catalog.create_namespace("default") - except NamespaceAlreadyExistsError: - pass - - try: - catalog.drop_table(identifier=identifier) - except NoSuchTableError: - pass - - pa_table = pa.Table.from_pydict( - { - "foo": ["a", None, "z"], - }, - schema=pa.schema([pa.field("foo", pa.large_string(), nullable=True)]), - ) - - tbl = catalog.create_table(identifier=identifier, schema=pa_table.schema, properties={"format-version": str(format_version)}) - - pa_table_with_column = pa.Table.from_pydict( - { - "foo": ["a", None, "z"], - "bar": [19, None, 25], - }, - schema=pa.schema( - [ - pa.field("foo", pa.large_string(), nullable=True), - pa.field("bar", pa.int32(), nullable=True), - ] - ), - ) - - with tbl.transaction() as txn: - with txn.update_schema() as schema_txn: - schema_txn.union_by_name(pa_table_with_column.schema) - - txn.append(pa_table_with_column) - txn.overwrite(pa_table_with_column) - txn.delete("foo = 'a'") - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - lazy_fixture("catalog_sqlite_without_rowcount"), - ], -) -@pytest.mark.parametrize("format_version", [1, 2]) -def test_create_table_transaction(catalog: SqlCatalog, format_version: int) -> None: - identifier = f"default.arrow_create_table_transaction_{catalog.name}_{format_version}" - try: - catalog.create_namespace("default") - except NamespaceAlreadyExistsError: - pass - - try: - catalog.drop_table(identifier=identifier) - except NoSuchTableError: - pass - - pa_table = pa.Table.from_pydict( - { - "foo": ["a", None, "z"], - }, - schema=pa.schema([pa.field("foo", pa.large_string(), nullable=True)]), - ) - - pa_table_with_column = pa.Table.from_pydict( - { - "foo": ["a", None, "z"], - "bar": [19, None, 25], - }, - schema=pa.schema( - [ - pa.field("foo", pa.large_string(), nullable=True), - pa.field("bar", pa.int32(), nullable=True), - ] - ), - ) - - with catalog.create_table_transaction( - identifier=identifier, schema=pa_table.schema, properties={"format-version": str(format_version)} - ) as txn: - with txn.update_snapshot().fast_append() as snapshot_update: - for data_file in _dataframe_to_data_files(table_metadata=txn.table_metadata, df=pa_table, io=txn._table.io): - snapshot_update.append_data_file(data_file) - - with txn.update_schema() as schema_txn: - schema_txn.union_by_name(pa_table_with_column.schema) - - with txn.update_snapshot().fast_append() as snapshot_update: - for data_file in _dataframe_to_data_files( - table_metadata=txn.table_metadata, df=pa_table_with_column, io=txn._table.io - ): - snapshot_update.append_data_file(data_file) - - tbl = catalog.load_table(identifier=identifier) - assert tbl.format_version == format_version - assert len(tbl.scan().to_arrow()) == 6 - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - lazy_fixture("catalog_sqlite_without_rowcount"), - ], -) -@pytest.mark.parametrize( - "table_identifier", - [ - lazy_fixture("random_table_identifier"), - lazy_fixture("random_hierarchical_identifier"), - ], -) -def test_table_properties_int_value(catalog: SqlCatalog, table_schema_simple: Schema, table_identifier: Identifier) -> None: - # table properties can be set to int, but still serialized to string - namespace = Catalog.namespace_from(table_identifier) - catalog.create_namespace(namespace) - property_with_int = {"property_name": 42} - table = catalog.create_table(table_identifier, table_schema_simple, properties=property_with_int) - assert isinstance(table.properties["property_name"], str) - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - lazy_fixture("catalog_sqlite_without_rowcount"), - ], -) -@pytest.mark.parametrize( - "table_identifier", - [ - lazy_fixture("random_table_identifier"), - lazy_fixture("random_hierarchical_identifier"), - ], -) -def test_table_properties_raise_for_none_value( - catalog: SqlCatalog, table_schema_simple: Schema, table_identifier: Identifier -) -> None: - namespace = Catalog.namespace_from(table_identifier) - catalog.create_namespace(namespace) - property_with_none = {"property_name": None} - with pytest.raises(ValidationError) as exc_info: - _ = catalog.create_table(table_identifier, table_schema_simple, properties=property_with_none) - assert "None type is not a supported value in properties: property_name" in str(exc_info.value) - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - ], -) -@pytest.mark.parametrize( - "table_identifier", - [ - lazy_fixture("random_table_identifier"), - lazy_fixture("random_hierarchical_identifier"), - ], -) -def test_table_exists(catalog: SqlCatalog, table_schema_simple: Schema, table_identifier: Identifier) -> None: - namespace = Catalog.namespace_from(table_identifier) - catalog.create_namespace(namespace) - catalog.create_table(table_identifier, table_schema_simple, properties={"format-version": "2"}) - existing_table = table_identifier - # Act and Assert for an existing table - assert catalog.table_exists(existing_table) is True - - # Act and Assert for a non-existing table - assert catalog.table_exists(("non", "exist")) is False - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - ], -) -@pytest.mark.parametrize("format_version", [1, 2]) -def test_merge_manifests_local_file_system(catalog: SqlCatalog, arrow_table_with_null: pa.Table, format_version: int) -> None: - # To catch manifest file name collision bug during merge: - # https://github.com/apache/iceberg-python/pull/363#discussion_r1660691918 - catalog.create_namespace_if_not_exists("default") - try: - catalog.drop_table("default.test_merge_manifest") - except NoSuchTableError: - pass - tbl = catalog.create_table( - "default.test_merge_manifest", - arrow_table_with_null.schema, - properties={ - "commit.manifest-merge.enabled": "true", - "commit.manifest.min-count-to-merge": "2", - "format-version": format_version, - }, - ) - - for _ in range(5): - tbl.append(arrow_table_with_null) - - assert len(tbl.scan().to_arrow()) == 5 * len(arrow_table_with_null) - current_snapshot = tbl.current_snapshot() - assert current_snapshot - manifests = current_snapshot.manifests(tbl.io) - assert len(manifests) == 1 - - -@pytest.mark.parametrize( - "catalog", - [ - lazy_fixture("catalog_memory"), - lazy_fixture("catalog_sqlite"), - lazy_fixture("catalog_sqlite_without_rowcount"), - ], -) -def test_delete_metadata_multiple(catalog: SqlCatalog, table_schema_nested: Schema, random_table_identifier: str) -> None: - namespace = Catalog.namespace_from(random_table_identifier) - catalog.create_namespace(namespace) - table = catalog.create_table(random_table_identifier, table_schema_nested) - - original_metadata_location = table.metadata_location - - for i in range(5): - with table.transaction() as transaction: - with transaction.update_schema() as update: - update.add_column(path=f"new_column_{i}", field_type=IntegerType()) - - assert len(table.metadata.metadata_log) == 5 - assert os.path.exists(original_metadata_location[len("file://") :]) - - # Set the max versions property to 2, and delete after commit - new_property = { - TableProperties.METADATA_PREVIOUS_VERSIONS_MAX: "2", - TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED: "true", - } - - with table.transaction() as transaction: - transaction.set_properties(properties=new_property) - - # Verify that only the most recent metadata files are kept - assert len(table.metadata.metadata_log) == 2 - updated_metadata_1, updated_metadata_2 = table.metadata.metadata_log - - # new metadata log was added, so earlier metadata logs are removed. - with table.transaction() as transaction: - with transaction.update_schema() as update: - update.add_column(path="new_column_x", field_type=IntegerType()) - - assert len(table.metadata.metadata_log) == 2 - assert not os.path.exists(original_metadata_location[len("file://") :]) - assert not os.path.exists(updated_metadata_1.metadata_file[len("file://") :]) - assert os.path.exists(updated_metadata_2.metadata_file[len("file://") :]) - - class TestSqlCatalogClose: """Test SqlCatalog close functionality.""" diff --git a/tests/conftest.py b/tests/conftest.py index 85c15d3e0b..ac451cf1e9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -434,17 +434,6 @@ def iceberg_schema_simple_no_ids() -> Schema: ) -@pytest.fixture(scope="session") -def iceberg_table_schema_simple() -> Schema: - return Schema( - NestedField(field_id=1, name="foo", field_type=StringType(), required=False), - NestedField(field_id=2, name="bar", field_type=IntegerType(), required=True), - NestedField(field_id=3, name="baz", field_type=BooleanType(), required=False), - schema_id=0, - identifier_field_ids=[], - ) - - @pytest.fixture(scope="session") def iceberg_schema_nested() -> Schema: return Schema( @@ -1887,7 +1876,7 @@ def test_schema() -> Schema: @pytest.fixture(scope="session") -def test_partition_spec() -> Schema: +def test_partition_spec() -> PartitionSpec: return PartitionSpec( PartitionField(1, 1000, IdentityTransform(), "VendorID"), PartitionField(2, 1001, DayTransform(), "tpep_pickup_day"), diff --git a/tests/table/test_upsert.py b/tests/table/test_upsert.py index 891d4bbac7..707ad424d5 100644 --- a/tests/table/test_upsert.py +++ b/tests/table/test_upsert.py @@ -27,11 +27,11 @@ from pyiceberg.expressions.literals import LongLiteral from pyiceberg.io.pyarrow import schema_to_pyarrow from pyiceberg.schema import Schema -from pyiceberg.table import UpsertResult +from pyiceberg.table import Table, UpsertResult from pyiceberg.table.snapshots import Operation from pyiceberg.table.upsert_util import create_match_filter from pyiceberg.types import IntegerType, NestedField, StringType, StructType -from tests.catalog.test_base import InMemoryCatalog, Table +from tests.catalog.test_base import InMemoryCatalog @pytest.fixture From aa4bfc0c46441521b84e48c6670d1c7b41b68a01 Mon Sep 17 00:00:00 2001 From: nssalian Date: Mon, 12 Jan 2026 20:33:30 -0800 Subject: [PATCH 2/3] fixing linting after merge --- tests/catalog/test_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/catalog/test_base.py b/tests/catalog/test_base.py index 73c89f73ec..1a47478313 100644 --- a/tests/catalog/test_base.py +++ b/tests/catalog/test_base.py @@ -22,7 +22,7 @@ import pytest -from pyiceberg.catalog import load_catalog +from pyiceberg.catalog import Catalog, load_catalog from pyiceberg.catalog.memory import InMemoryCatalog from pyiceberg.io import WAREHOUSE from pyiceberg.schema import Schema From d685271e096892251f91ac503676b0af4d9a3b28 Mon Sep 17 00:00:00 2001 From: nssalian Date: Mon, 12 Jan 2026 20:56:48 -0800 Subject: [PATCH 3/3] fix mypy issues --- tests/catalog/conftest.py | 183 ------------------------ tests/catalog/test_catalog_behaviors.py | 64 +++++---- tests/conftest.py | 160 +++++++++++++++++++++ 3 files changed, 200 insertions(+), 207 deletions(-) delete mode 100644 tests/catalog/conftest.py diff --git a/tests/catalog/conftest.py b/tests/catalog/conftest.py deleted file mode 100644 index 1d819af243..0000000000 --- a/tests/catalog/conftest.py +++ /dev/null @@ -1,183 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -"""Pytest fixtures for parameterized catalog behavior tests.""" - -import os -from collections.abc import Generator -from pathlib import Path - -import pytest -from pytest_lazyfixture import lazy_fixture - -from pyiceberg.catalog import Catalog -from pyiceberg.catalog.memory import InMemoryCatalog -from pyiceberg.catalog.sql import SqlCatalog -from pyiceberg.typedef import Identifier - - -def _create_memory_catalog(name: str, warehouse: Path) -> InMemoryCatalog: - return InMemoryCatalog(name, warehouse=f"file://{warehouse}") - - -def _create_sql_catalog(name: str, warehouse: Path) -> SqlCatalog: - catalog = SqlCatalog( - name, - uri="sqlite:///:memory:", - warehouse=f"file://{warehouse}", - ) - catalog.create_tables() - return catalog - - -def _create_sql_without_rowcount_catalog(name: str, warehouse: Path) -> SqlCatalog: - props = { - "uri": f"sqlite:////{warehouse}/sql-catalog", - "warehouse": f"file://{warehouse}", - } - catalog = SqlCatalog(name, **props) - catalog.engine.dialect.supports_sane_rowcount = False - catalog.create_tables() - return catalog - - -_CATALOG_FACTORIES = { - "memory": _create_memory_catalog, - "sql": _create_sql_catalog, - "sql_without_rowcount": _create_sql_without_rowcount_catalog, -} - - -@pytest.fixture(params=list(_CATALOG_FACTORIES.keys())) -def catalog(request: pytest.FixtureRequest, tmp_path: Path) -> Generator[Catalog, None, None]: - """Parameterized fixture that yields catalogs listed in _CATALOG_FACTORIES.""" - catalog_type = request.param - factory = _CATALOG_FACTORIES[catalog_type] - cat = factory("test_catalog", tmp_path) - yield cat - if hasattr(cat, "destroy_tables"): - cat.destroy_tables() - - -@pytest.fixture(params=list(_CATALOG_FACTORIES.keys())) -def catalog_with_warehouse( - request: pytest.FixtureRequest, - warehouse: Path, -) -> Generator[Catalog, None, None]: - factory = _CATALOG_FACTORIES[request.param] - cat = factory("test_catalog", warehouse) - yield cat - if hasattr(cat, "destroy_tables"): - cat.destroy_tables() - - -@pytest.fixture(name="random_table_identifier") -def fixture_random_table_identifier(warehouse: Path, database_name: str, table_name: str) -> Identifier: - os.makedirs(f"{warehouse}/{database_name}/{table_name}/metadata/", exist_ok=True) - return database_name, table_name - - -@pytest.fixture(name="another_random_table_identifier") -def fixture_another_random_table_identifier(warehouse: Path, database_name: str, table_name: str) -> Identifier: - database_name = database_name + "_new" - table_name = table_name + "_new" - os.makedirs(f"{warehouse}/{database_name}/{table_name}/metadata/", exist_ok=True) - return database_name, table_name - - -@pytest.fixture(name="random_hierarchical_identifier") -def fixture_random_hierarchical_identifier(warehouse: Path, hierarchical_namespace_name: str, table_name: str) -> Identifier: - os.makedirs(f"{warehouse}/{hierarchical_namespace_name}/{table_name}/metadata/", exist_ok=True) - return Catalog.identifier_to_tuple(".".join((hierarchical_namespace_name, table_name))) - - -@pytest.fixture(name="another_random_hierarchical_identifier") -def fixture_another_random_hierarchical_identifier( - warehouse: Path, hierarchical_namespace_name: str, table_name: str -) -> Identifier: - hierarchical_namespace_name = hierarchical_namespace_name + "_new" - table_name = table_name + "_new" - os.makedirs(f"{warehouse}/{hierarchical_namespace_name}/{table_name}/metadata/", exist_ok=True) - return Catalog.identifier_to_tuple(".".join((hierarchical_namespace_name, table_name))) - - -@pytest.fixture(scope="session") -def fixed_test_table_identifier() -> Identifier: - return "com", "organization", "department", "my_table" - - -@pytest.fixture(scope="session") -def another_fixed_test_table_identifier() -> Identifier: - return "com", "organization", "department_alt", "my_another_table" - - -@pytest.fixture(scope="session") -def fixed_test_table_namespace() -> Identifier: - return "com", "organization", "department" - - -@pytest.fixture( - scope="session", - params=[ - lazy_fixture("fixed_test_table_identifier"), - lazy_fixture("random_table_identifier"), - lazy_fixture("random_hierarchical_identifier"), - ], -) -def test_table_identifier(request) -> Identifier: - return request.param - - -@pytest.fixture( - scope="session", - params=[ - lazy_fixture("another_fixed_test_table_identifier"), - lazy_fixture("another_random_table_identifier"), - lazy_fixture("another_random_hierarchical_identifier"), - ], -) -def another_table_identifier(request) -> Identifier: - return request.param - - -@pytest.fixture( - params=[ - lazy_fixture("database_name"), - lazy_fixture("hierarchical_namespace_name"), - lazy_fixture("fixed_test_table_namespace"), - ], -) -def test_namespace(request) -> Identifier: - ns = request.param - if isinstance(ns, tuple): - return ns - if "." in ns: - return tuple(ns.split(".")) - return (ns,) - - -@pytest.fixture(scope="session") -def test_namespace_properties() -> dict[str, str]: - return {"key1": "value1", "key2": "value2"} - - -@pytest.fixture(scope="session") -def test_table_properties() -> dict[str, str]: - return { - "key1": "value1", - "key2": "value2", - } diff --git a/tests/catalog/test_catalog_behaviors.py b/tests/catalog/test_catalog_behaviors.py index fe59329e79..2968935893 100644 --- a/tests/catalog/test_catalog_behaviors.py +++ b/tests/catalog/test_catalog_behaviors.py @@ -41,9 +41,10 @@ from pyiceberg.io.pyarrow import _dataframe_to_data_files, schema_to_pyarrow from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec from pyiceberg.schema import Schema -from pyiceberg.table import AddSchemaUpdate, SetCurrentSchemaUpdate, TableProperties +from pyiceberg.table import TableProperties from pyiceberg.table.snapshots import Operation from pyiceberg.table.sorting import NullOrder, SortDirection, SortField, SortOrder +from pyiceberg.table.update import AddSchemaUpdate, SetCurrentSchemaUpdate from pyiceberg.transforms import IdentityTransform from pyiceberg.typedef import Identifier from pyiceberg.types import BooleanType, IntegerType, LongType, NestedField, StringType @@ -236,7 +237,11 @@ def test_create_table_with_default_warehouse_location( def test_create_table_location_override( - catalog: Catalog, tmp_path: Path, table_schema_nested: Schema, test_table_identifier: Identifier, test_table_properties: dict + catalog: Catalog, + tmp_path: Path, + table_schema_nested: Schema, + test_table_identifier: Identifier, + test_table_properties: dict[str, str], ) -> None: test_partition_spec = PartitionSpec(PartitionField(name="x", transform=IdentityTransform(), source_id=1, field_id=1000)) new_location = f"file://{tmp_path}/new_location" @@ -272,12 +277,15 @@ def test_create_table_removes_trailing_slash_from_location( def test_create_tables_idempotency(catalog: Catalog) -> None: # Second initialization should not fail even if tables are already created - catalog.create_tables() - catalog.create_tables() + catalog.create_tables() # type: ignore[attr-defined] + catalog.create_tables() # type: ignore[attr-defined] def test_create_table_pyarrow_schema( - catalog: Catalog, pyarrow_schema_simple_without_ids: pa.Schema, test_table_identifier: Identifier, test_table_properties: dict + catalog: Catalog, + pyarrow_schema_simple_without_ids: pa.Schema, + test_table_identifier: Identifier, + test_table_properties: dict[str, str], ) -> None: namespace = Catalog.namespace_from(test_table_identifier) catalog.create_namespace(namespace) @@ -544,7 +552,7 @@ def test_commit_table(catalog: Catalog, table_schema_nested: Schema, test_table_ original_table_metadata_location = table.metadata_location original_table_last_updated_ms = table.metadata.last_updated_ms - assert catalog._parse_metadata_version(table.metadata_location) == 0 + assert catalog._parse_metadata_version(table.metadata_location) == 0 # type: ignore[attr-defined] assert table.metadata.current_schema_id == 0 transaction = table.transaction() @@ -555,7 +563,7 @@ def test_commit_table(catalog: Catalog, table_schema_nested: Schema, test_table_ updated_table_metadata = table.metadata - assert catalog._parse_metadata_version(table.metadata_location) == 1 + assert catalog._parse_metadata_version(table.metadata_location) == 1 # type: ignore[attr-defined] assert updated_table_metadata.current_schema_id == 1 assert len(updated_table_metadata.schemas) == 2 new_schema = next(schema for schema in updated_table_metadata.schemas if schema.schema_id == 1) @@ -731,7 +739,9 @@ def test_table_writes_metadata_to_custom_location( schema=schema_to_pyarrow(table_schema_simple), ) table.append(df) - manifests = table.current_snapshot().manifests(table.io) + snapshot = table.current_snapshot() + assert snapshot is not None + manifests = snapshot.manifests(table.io) location_provider = table.location_provider() assert location_provider.new_metadata_location("").startswith(metadata_path) @@ -744,7 +754,7 @@ def test_table_writes_metadata_to_default_path( catalog: Catalog, test_table_identifier: Identifier, table_schema_simple: Schema, - test_table_properties: dict, + test_table_properties: dict[str, str], ) -> None: namespace = Catalog.namespace_from(test_table_identifier) catalog.create_namespace(namespace) @@ -759,7 +769,9 @@ def test_table_writes_metadata_to_default_path( schema=schema_to_pyarrow(table_schema_simple), ) table.append(df) - manifests = table.current_snapshot().manifests(table.io) + snapshot = table.current_snapshot() + assert snapshot is not None + manifests = snapshot.manifests(table.io) location_provider = table.location_provider() assert location_provider.new_metadata_location("").startswith(metadata_path) @@ -932,15 +944,15 @@ def test_add_column_with_statement(catalog: Catalog, table_schema_simple: Schema # Namespace tests -def test_create_namespace(catalog: Catalog, test_namespace: Identifier, test_table_properties: dict) -> None: +def test_create_namespace(catalog: Catalog, test_namespace: Identifier, test_table_properties: dict[str, str]) -> None: catalog.create_namespace(test_namespace, test_table_properties) - assert catalog._namespace_exists(test_namespace) + assert catalog._namespace_exists(test_namespace) # type: ignore[attr-defined] assert (Catalog.identifier_to_tuple(test_namespace)[:1]) in catalog.list_namespaces() assert test_table_properties == catalog.load_namespace_properties(test_namespace) def test_create_namespace_raises_error_on_existing_namespace( - catalog: Catalog, test_namespace: Identifier, test_table_properties: dict + catalog: Catalog, test_namespace: Identifier, test_table_properties: dict[str, str] ) -> None: catalog.create_namespace(test_namespace, test_table_properties) with pytest.raises(NamespaceAlreadyExistsError): @@ -1003,12 +1015,16 @@ def test_get_namespace_metadata_raises_error_when_namespace_does_not_exist(catal def test_namespace_exists(catalog: Catalog) -> None: for ns in [("db1",), ("db1", "ns1"), ("db2", "ns1"), ("db3", "ns1", "ns2")]: catalog.create_namespace(ns) - assert catalog._namespace_exists(ns) + assert catalog._namespace_exists(ns) # type: ignore[attr-defined] - assert catalog._namespace_exists("db2") # `db2` exists because `db2.ns1` exists - assert catalog._namespace_exists("db3.ns1") # `db3.ns1` exists because `db3.ns1.ns2` exists - assert not catalog._namespace_exists("db_") # make sure '_' is escaped in the query - assert not catalog._namespace_exists("db%") # make sure '%' is escaped in the query + # `db2` exists because `db2.ns1` exists + assert catalog._namespace_exists("db2") # type: ignore[attr-defined] + # `db3.ns1` exists because `db3.ns1.ns2` exists + assert catalog._namespace_exists("db3.ns1") # type: ignore[attr-defined] + # make sure '_' is escaped in the query + assert not catalog._namespace_exists("db_") # type: ignore[attr-defined] + # make sure '%' is escaped in the query + assert not catalog._namespace_exists("db%") # type: ignore[attr-defined] # Namespace properties @@ -1048,7 +1064,7 @@ def test_load_empty_namespace_properties(catalog: Catalog, test_namespace: Ident def test_list_namespaces(catalog: Catalog) -> None: namespace_list = ["db", "db.ns1", "db.ns1.ns2", "db.ns2", "db2", "db2.ns1", "db%"] for namespace in namespace_list: - if not catalog._namespace_exists(namespace): + if not catalog._namespace_exists(namespace): # type: ignore[attr-defined] catalog.create_namespace(namespace) ns_list = catalog.list_namespaces() @@ -1068,7 +1084,7 @@ def test_list_namespaces(catalog: Catalog) -> None: def test_list_namespaces_fuzzy_match(catalog: Catalog) -> None: namespace_list = ["db.ns1", "db.ns1.ns2", "db.ns2", "db.ns1X.ns3", "db_.ns1.ns2", "db2.ns1.ns2"] for namespace in namespace_list: - if not catalog._namespace_exists(namespace): + if not catalog._namespace_exists(namespace): # type: ignore[attr-defined] catalog.create_namespace(namespace) assert catalog.list_namespaces("db.ns1") == [("db", "ns1", "ns2")] @@ -1124,7 +1140,7 @@ def test_update_namespace_metadata(catalog: Catalog, test_namespace: Identifier, catalog.create_namespace(test_namespace, test_table_properties) new_metadata = {"key3": "value3", "key4": "value4"} summary = catalog.update_namespace_properties(test_namespace, updates=new_metadata) - assert catalog._namespace_exists(test_namespace) + assert catalog._namespace_exists(test_namespace) # type: ignore[attr-defined] assert new_metadata.items() <= catalog.load_namespace_properties(test_namespace).items() assert summary.removed == [] assert sorted(summary.updated) == ["key3", "key4"] @@ -1138,7 +1154,7 @@ def test_update_namespace_metadata_removals( new_metadata = {"key3": "value3", "key4": "value4"} remove_metadata = {"key1"} summary = catalog.update_namespace_properties(test_namespace, remove_metadata, new_metadata) - assert catalog._namespace_exists(test_namespace) + assert catalog._namespace_exists(test_namespace) # type: ignore[attr-defined] assert new_metadata.items() <= catalog.load_namespace_properties(test_namespace).items() assert remove_metadata.isdisjoint(catalog.load_namespace_properties(test_namespace).keys()) assert summary.removed == ["key1"] @@ -1152,13 +1168,13 @@ def test_update_namespace_metadata_removals( def test_drop_namespace(catalog: Catalog, table_schema_nested: Schema, test_table_identifier: Identifier) -> None: namespace = Catalog.namespace_from(test_table_identifier) catalog.create_namespace(namespace) - assert catalog._namespace_exists(namespace) + assert catalog._namespace_exists(namespace) # type: ignore[attr-defined] catalog.create_table(test_table_identifier, table_schema_nested) with pytest.raises(NamespaceNotEmptyError): catalog.drop_namespace(namespace) catalog.drop_table(test_table_identifier) catalog.drop_namespace(namespace) - assert not catalog._namespace_exists(namespace) + assert not catalog._namespace_exists(namespace) # type: ignore[attr-defined] def test_drop_namespace_raises_error_when_namespace_does_not_exist(catalog: Catalog) -> None: diff --git a/tests/conftest.py b/tests/conftest.py index 24df8434f4..9d80f48972 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -45,9 +45,12 @@ import pytest from moto import mock_aws from pydantic_core import to_json +from pytest_lazyfixture import lazy_fixture from pyiceberg.catalog import Catalog, load_catalog +from pyiceberg.catalog.memory import InMemoryCatalog from pyiceberg.catalog.noop import NoopCatalog +from pyiceberg.catalog.sql import SqlCatalog from pyiceberg.expressions import BoundReference from pyiceberg.io import ( ADLS_ACCOUNT_KEY, @@ -71,6 +74,7 @@ from pyiceberg.table import FileScanTask, Table from pyiceberg.table.metadata import TableMetadataV1, TableMetadataV2, TableMetadataV3 from pyiceberg.transforms import DayTransform, IdentityTransform +from pyiceberg.typedef import Identifier from pyiceberg.types import ( BinaryType, BooleanType, @@ -2949,3 +2953,159 @@ def ray_session() -> Generator[Any, None, None]: ) yield ray ray.shutdown() + + +# Catalog fixtures + + +def _create_memory_catalog(name: str, warehouse: Path) -> InMemoryCatalog: + return InMemoryCatalog(name, warehouse=f"file://{warehouse}") + + +def _create_sql_catalog(name: str, warehouse: Path) -> SqlCatalog: + catalog = SqlCatalog( + name, + uri="sqlite:///:memory:", + warehouse=f"file://{warehouse}", + ) + catalog.create_tables() + return catalog + + +def _create_sql_without_rowcount_catalog(name: str, warehouse: Path) -> SqlCatalog: + props = { + "uri": f"sqlite:////{warehouse}/sql-catalog", + "warehouse": f"file://{warehouse}", + } + catalog = SqlCatalog(name, **props) + catalog.engine.dialect.supports_sane_rowcount = False + catalog.create_tables() + return catalog + + +_CATALOG_FACTORIES = { + "memory": _create_memory_catalog, + "sql": _create_sql_catalog, + "sql_without_rowcount": _create_sql_without_rowcount_catalog, +} + + +@pytest.fixture(params=list(_CATALOG_FACTORIES.keys())) +def catalog(request: pytest.FixtureRequest, tmp_path: Path) -> Generator[Catalog, None, None]: + """Parameterized fixture that yields catalogs listed in _CATALOG_FACTORIES.""" + catalog_type = request.param + factory = _CATALOG_FACTORIES[catalog_type] + cat = factory("test_catalog", tmp_path) + yield cat + if hasattr(cat, "destroy_tables"): + cat.destroy_tables() + + +@pytest.fixture(params=list(_CATALOG_FACTORIES.keys())) +def catalog_with_warehouse( + request: pytest.FixtureRequest, + warehouse: Path, +) -> Generator[Catalog, None, None]: + factory = _CATALOG_FACTORIES[request.param] + cat = factory("test_catalog", warehouse) + yield cat + if hasattr(cat, "destroy_tables"): + cat.destroy_tables() + + +@pytest.fixture(name="random_table_identifier") +def fixture_random_table_identifier(warehouse: Path, database_name: str, table_name: str) -> Identifier: + os.makedirs(f"{warehouse}/{database_name}/{table_name}/metadata/", exist_ok=True) + return database_name, table_name + + +@pytest.fixture(name="another_random_table_identifier") +def fixture_another_random_table_identifier(warehouse: Path, database_name: str, table_name: str) -> Identifier: + database_name = database_name + "_new" + table_name = table_name + "_new" + os.makedirs(f"{warehouse}/{database_name}/{table_name}/metadata/", exist_ok=True) + return database_name, table_name + + +@pytest.fixture(name="random_hierarchical_identifier") +def fixture_random_hierarchical_identifier(warehouse: Path, hierarchical_namespace_name: str, table_name: str) -> Identifier: + os.makedirs(f"{warehouse}/{hierarchical_namespace_name}/{table_name}/metadata/", exist_ok=True) + return Catalog.identifier_to_tuple(".".join((hierarchical_namespace_name, table_name))) + + +@pytest.fixture(name="another_random_hierarchical_identifier") +def fixture_another_random_hierarchical_identifier( + warehouse: Path, hierarchical_namespace_name: str, table_name: str +) -> Identifier: + hierarchical_namespace_name = hierarchical_namespace_name + "_new" + table_name = table_name + "_new" + os.makedirs(f"{warehouse}/{hierarchical_namespace_name}/{table_name}/metadata/", exist_ok=True) + return Catalog.identifier_to_tuple(".".join((hierarchical_namespace_name, table_name))) + + +@pytest.fixture(scope="session") +def fixed_test_table_identifier() -> Identifier: + return "com", "organization", "department", "my_table" + + +@pytest.fixture(scope="session") +def another_fixed_test_table_identifier() -> Identifier: + return "com", "organization", "department_alt", "my_another_table" + + +@pytest.fixture(scope="session") +def fixed_test_table_namespace() -> Identifier: + return "com", "organization", "department" + + +@pytest.fixture( + scope="session", + params=[ + lazy_fixture("fixed_test_table_identifier"), + lazy_fixture("random_table_identifier"), + lazy_fixture("random_hierarchical_identifier"), + ], +) +def test_table_identifier(request: pytest.FixtureRequest) -> Identifier: + return request.param + + +@pytest.fixture( + scope="session", + params=[ + lazy_fixture("another_fixed_test_table_identifier"), + lazy_fixture("another_random_table_identifier"), + lazy_fixture("another_random_hierarchical_identifier"), + ], +) +def another_table_identifier(request: pytest.FixtureRequest) -> Identifier: + return request.param + + +@pytest.fixture( + params=[ + lazy_fixture("database_name"), + lazy_fixture("hierarchical_namespace_name"), + lazy_fixture("fixed_test_table_namespace"), + ], +) +def test_namespace(request: pytest.FixtureRequest) -> Identifier: + ns = request.param + if isinstance(ns, tuple): + return ns + if "." in ns: + return tuple(ns.split(".")) + return (ns,) + + +@pytest.fixture(scope="session") +def test_namespace_properties() -> dict[str, str]: + return {"key1": "value1", "key2": "value2"} + + +@pytest.fixture(scope="session") +def test_table_properties() -> dict[str, str]: + return { + "key1": "value1", + "key2": "value2", + }