Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,17 @@ def view_exists(self, identifier: str | Identifier) -> bool:
bool: True if the view exists, False otherwise.
"""

@abstractmethod
def namespace_exists(self, namespace: str | Identifier) -> bool:
"""Check if a namespace exists.

Args:
namespace (str | Identifier): Namespace identifier.

Returns:
bool: True if the namespace exists, False otherwise.
"""

@abstractmethod
def register_table(self, identifier: str | Identifier, metadata_location: str) -> Table:
"""Register a new table using existing metadata.
Expand Down Expand Up @@ -845,6 +856,21 @@ def table_exists(self, identifier: str | Identifier) -> bool:
except NoSuchTableError:
return False

def namespace_exists(self, namespace: str | Identifier) -> bool:
"""Check if a namespace exists.

Args:
namespace (str | Identifier): Namespace identifier.

Returns:
bool: True if the namespace exists, False otherwise.
"""
try:
self.load_namespace_properties(namespace)
return True
except NoSuchNamespaceError:
return False

def purge_table(self, identifier: str | Identifier) -> None:
table = self.load_table(identifier)
self.drop_table(identifier)
Expand Down
3 changes: 3 additions & 0 deletions pyiceberg/catalog/noop.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,5 +122,8 @@ def list_views(self, namespace: str | Identifier) -> list[Identifier]:
def view_exists(self, identifier: str | Identifier) -> bool:
raise NotImplementedError

def namespace_exists(self, namespace: str | Identifier) -> bool:
raise NotImplementedError

def drop_view(self, identifier: str | Identifier) -> None:
raise NotImplementedError
20 changes: 10 additions & 10 deletions pyiceberg/catalog/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ def create_table(

namespace_identifier = Catalog.namespace_from(identifier)
table_name = Catalog.table_name_from(identifier)
if not self._namespace_exists(namespace_identifier):
if not self.namespace_exists(namespace_identifier):
raise NoSuchNamespaceError(f"Namespace does not exist: {namespace_identifier}")

namespace = Catalog.namespace_to_string(namespace_identifier)
Expand Down Expand Up @@ -251,7 +251,7 @@ def register_table(self, identifier: str | Identifier, metadata_location: str) -
namespace_tuple = Catalog.namespace_from(identifier)
namespace = Catalog.namespace_to_string(namespace_tuple)
table_name = Catalog.table_name_from(identifier)
if not self._namespace_exists(namespace):
if not self.namespace_exists(namespace):
raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}")

with Session(self.engine) as session:
Expand Down Expand Up @@ -361,7 +361,7 @@ def rename_table(self, from_identifier: str | Identifier, to_identifier: str | I
to_namespace_tuple = Catalog.namespace_from(to_identifier)
to_namespace = Catalog.namespace_to_string(to_namespace_tuple)
to_table_name = Catalog.table_name_from(to_identifier)
if not self._namespace_exists(to_namespace):
if not self.namespace_exists(to_namespace):
raise NoSuchNamespaceError(f"Namespace does not exist: {to_namespace}")
with Session(self.engine) as session:
try:
Expand Down Expand Up @@ -495,7 +495,7 @@ def commit_table(
metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location
)

def _namespace_exists(self, identifier: str | Identifier) -> bool:
def namespace_exists(self, identifier: str | Identifier) -> bool:
namespace_tuple = Catalog.identifier_to_tuple(identifier)
namespace = Catalog.namespace_to_string(namespace_tuple, NoSuchNamespaceError)
namespace_starts_with = namespace.replace("!", "!!").replace("_", "!_").replace("%", "!%") + ".%"
Expand Down Expand Up @@ -537,7 +537,7 @@ def create_namespace(self, namespace: str | Identifier, properties: Properties =
Raises:
NamespaceAlreadyExistsError: If a namespace with the given name already exists.
"""
if self._namespace_exists(namespace):
if self.namespace_exists(namespace):
raise NamespaceAlreadyExistsError(f"Namespace {namespace} already exists")

if not properties:
Expand Down Expand Up @@ -565,7 +565,7 @@ def drop_namespace(self, namespace: str | Identifier) -> None:
NoSuchNamespaceError: If a namespace with the given name does not exist.
NamespaceNotEmptyError: If the namespace is not empty.
"""
if not self._namespace_exists(namespace):
if not self.namespace_exists(namespace):
raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}")

namespace_str = Catalog.namespace_to_string(namespace)
Expand Down Expand Up @@ -593,7 +593,7 @@ def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
Raises:
NoSuchNamespaceError: If a namespace with the given name does not exist.
"""
if namespace and not self._namespace_exists(namespace):
if namespace and not self.namespace_exists(namespace):
raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}")

namespace = Catalog.namespace_to_string(namespace)
Expand All @@ -614,7 +614,7 @@ def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]:
Raises:
NoSuchNamespaceError: If a namespace with the given name does not exist.
"""
if namespace and not self._namespace_exists(namespace):
if namespace and not self.namespace_exists(namespace):
raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}")

table_stmt = select(IcebergTables.table_namespace).where(IcebergTables.catalog_name == self.name)
Expand Down Expand Up @@ -656,7 +656,7 @@ def load_namespace_properties(self, namespace: str | Identifier) -> Properties:
NoSuchNamespaceError: If a namespace with the given name does not exist.
"""
namespace_str = Catalog.namespace_to_string(namespace)
if not self._namespace_exists(namespace):
if not self.namespace_exists(namespace):
raise NoSuchNamespaceError(f"Namespace {namespace_str} does not exists")

stmt = select(IcebergNamespaceProperties).where(
Expand All @@ -681,7 +681,7 @@ def update_namespace_properties(
ValueError: If removals and updates have overlapping keys.
"""
namespace_str = Catalog.namespace_to_string(namespace)
if not self._namespace_exists(namespace):
if not self.namespace_exists(namespace):
raise NoSuchNamespaceError(f"Namespace {namespace_str} does not exists")

current_properties = self.load_namespace_properties(namespace=namespace)
Expand Down
24 changes: 12 additions & 12 deletions tests/catalog/test_catalog_behaviors.py
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,7 @@ def test_add_column_with_statement(catalog: Catalog, table_schema_simple: Schema

def test_create_namespace(catalog: Catalog, test_namespace: Identifier, test_table_properties: dict[str, str]) -> None:
catalog.create_namespace(test_namespace, test_table_properties)
assert catalog._namespace_exists(test_namespace) # type: ignore[attr-defined]
assert catalog.namespace_exists(test_namespace)
assert (Catalog.identifier_to_tuple(test_namespace)[:1]) in catalog.list_namespaces()
assert test_table_properties == catalog.load_namespace_properties(test_namespace)

Expand Down Expand Up @@ -1015,16 +1015,16 @@ def test_get_namespace_metadata_raises_error_when_namespace_does_not_exist(catal
def test_namespace_exists(catalog: Catalog) -> None:
for ns in [("db1",), ("db1", "ns1"), ("db2", "ns1"), ("db3", "ns1", "ns2")]:
catalog.create_namespace(ns)
assert catalog._namespace_exists(ns) # type: ignore[attr-defined]
assert catalog.namespace_exists(ns)

# `db2` exists because `db2.ns1` exists
assert catalog._namespace_exists("db2") # type: ignore[attr-defined]
assert catalog.namespace_exists("db2")
# `db3.ns1` exists because `db3.ns1.ns2` exists
assert catalog._namespace_exists("db3.ns1") # type: ignore[attr-defined]
assert catalog.namespace_exists("db3.ns1")
# make sure '_' is escaped in the query
assert not catalog._namespace_exists("db_") # type: ignore[attr-defined]
assert not catalog.namespace_exists("db_")
# make sure '%' is escaped in the query
assert not catalog._namespace_exists("db%") # type: ignore[attr-defined]
assert not catalog.namespace_exists("db%")


# Namespace properties
Expand Down Expand Up @@ -1064,7 +1064,7 @@ def test_load_empty_namespace_properties(catalog: Catalog, test_namespace: Ident
def test_list_namespaces(catalog: Catalog) -> None:
namespace_list = ["db", "db.ns1", "db.ns1.ns2", "db.ns2", "db2", "db2.ns1", "db%"]
for namespace in namespace_list:
if not catalog._namespace_exists(namespace): # type: ignore[attr-defined]
if not catalog.namespace_exists(namespace):
catalog.create_namespace(namespace)

ns_list = catalog.list_namespaces()
Expand All @@ -1084,7 +1084,7 @@ def test_list_namespaces(catalog: Catalog) -> None:
def test_list_namespaces_fuzzy_match(catalog: Catalog) -> None:
namespace_list = ["db.ns1", "db.ns1.ns2", "db.ns2", "db.ns1X.ns3", "db_.ns1.ns2", "db2.ns1.ns2"]
for namespace in namespace_list:
if not catalog._namespace_exists(namespace): # type: ignore[attr-defined]
if not catalog.namespace_exists(namespace):
catalog.create_namespace(namespace)

assert catalog.list_namespaces("db.ns1") == [("db", "ns1", "ns2")]
Expand Down Expand Up @@ -1140,7 +1140,7 @@ def test_update_namespace_metadata(catalog: Catalog, test_namespace: Identifier,
catalog.create_namespace(test_namespace, test_table_properties)
new_metadata = {"key3": "value3", "key4": "value4"}
summary = catalog.update_namespace_properties(test_namespace, updates=new_metadata)
assert catalog._namespace_exists(test_namespace) # type: ignore[attr-defined]
assert catalog.namespace_exists(test_namespace)
assert new_metadata.items() <= catalog.load_namespace_properties(test_namespace).items()
assert summary.removed == []
assert sorted(summary.updated) == ["key3", "key4"]
Expand All @@ -1154,7 +1154,7 @@ def test_update_namespace_metadata_removals(
new_metadata = {"key3": "value3", "key4": "value4"}
remove_metadata = {"key1"}
summary = catalog.update_namespace_properties(test_namespace, remove_metadata, new_metadata)
assert catalog._namespace_exists(test_namespace) # type: ignore[attr-defined]
assert catalog.namespace_exists(test_namespace)
assert new_metadata.items() <= catalog.load_namespace_properties(test_namespace).items()
assert remove_metadata.isdisjoint(catalog.load_namespace_properties(test_namespace).keys())
assert summary.removed == ["key1"]
Expand All @@ -1168,13 +1168,13 @@ def test_update_namespace_metadata_removals(
def test_drop_namespace(catalog: Catalog, table_schema_nested: Schema, test_table_identifier: Identifier) -> None:
namespace = Catalog.namespace_from(test_table_identifier)
catalog.create_namespace(namespace)
assert catalog._namespace_exists(namespace) # type: ignore[attr-defined]
assert catalog.namespace_exists(namespace)
catalog.create_table(test_table_identifier, table_schema_nested)
with pytest.raises(NamespaceNotEmptyError):
catalog.drop_namespace(namespace)
catalog.drop_table(test_table_identifier)
catalog.drop_namespace(namespace)
assert not catalog._namespace_exists(namespace) # type: ignore[attr-defined]
assert not catalog.namespace_exists(namespace)


def test_drop_namespace_raises_error_when_namespace_does_not_exist(catalog: Catalog) -> None:
Expand Down
3 changes: 3 additions & 0 deletions tests/integration/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,10 @@ def test_concurrent_create_transaction(test_catalog: Catalog, test_schema: Schem
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_create_namespace(test_catalog: Catalog, database_name: str) -> None:
assert not test_catalog.namespace_exists(database_name)

test_catalog.create_namespace(database_name)
assert test_catalog.namespace_exists(database_name)
assert (database_name,) in test_catalog.list_namespaces()


Expand Down