Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions docs/en/sql-reference/table-functions/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,6 @@ ENGINE = IcebergS3(
'minio_access_key',
'minio_secret_key'
)
SETTINGS
storage_catalog_type="rest",
storage_warehouse="demo",
object_storage_endpoint="http://minio:9000/warehouse-rest",
storage_region="us-east-1",
storage_catalog_url="http://rest:8181/v1"
```

Or, using AWS Glue Data Catalog with S3:
Expand All @@ -104,12 +98,6 @@ ENGINE = IcebergS3(
'aws_access_key',
'aws_secret_key'
)
SETTINGS
storage_catalog_type = 'glue',
storage_warehouse = 'my_database',
object_storage_endpoint = 's3://my-data-bucket/',
storage_region = 'us-east-1',
storage_catalog_url = 'https://glue.us-east-1.amazonaws.com/iceberg/v1'
```

## Schema Evolution {#schema-evolution}
Expand Down
7 changes: 4 additions & 3 deletions src/Databases/DataLake/DatabaseDataLake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,12 @@ void DatabaseDataLake::validateSettings()

std::shared_ptr<DataLake::ICatalog> DatabaseDataLake::getCatalog() const
{
if (settings[DatabaseDataLakeSetting::catalog_type].value == DatabaseDataLakeCatalogType::NONE)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unspecified catalog type");

if (catalog_impl)
return catalog_impl;

if (settings[DatabaseDataLakeSetting::catalog_type].value == DatabaseDataLakeCatalogType::NONE)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unspecified catalog type");

auto catalog_parameters = DataLake::CatalogSettings{
.storage_endpoint = settings[DatabaseDataLakeSetting::storage_endpoint].value,
.aws_access_key_id = settings[DatabaseDataLakeSetting::aws_access_key_id].value,
Expand Down Expand Up @@ -300,6 +300,7 @@ std::shared_ptr<DataLake::ICatalog> DatabaseDataLake::getCatalog() const
break;
}
}

return catalog_impl;
}

Expand Down
2 changes: 1 addition & 1 deletion src/Databases/DataLake/DatabaseDataLake.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class DatabaseDataLake final : public IDatabase, WithContext
const String & name,
bool /*sync*/) override;

std::shared_ptr<DataLake::ICatalog> getCatalog() const;
protected:
ASTPtr getCreateDatabaseQueryImpl() const override TSA_REQUIRES(mutex);
ASTPtr getCreateTableQueryImpl(const String & table_name, ContextPtr context, bool throw_on_error) const override;
Expand All @@ -82,7 +83,6 @@ class DatabaseDataLake final : public IDatabase, WithContext
mutable std::shared_ptr<DataLake::ICatalog> catalog_impl;

void validateSettings();
std::shared_ptr<DataLake::ICatalog> getCatalog() const;

StorageObjectStorageConfigurationPtr getConfiguration(
DatabaseDataLakeStorageType type,
Expand Down
60 changes: 18 additions & 42 deletions src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@
#include <Common/filesystemHelpers.h>
#include <Disks/DiskType.h>
#include <Disks/DiskObjectStorage/ObjectStorages/IObjectStorage.h>
#include <Databases/DataLake/RestCatalog.h>
#include <Databases/DataLake/GlueCatalog.h>
#include <Storages/ObjectStorage/StorageObjectStorageConfiguration.h>
#include <Storages/ObjectStorage/Utils.h>
#include <Disks/DiskObjectStorage/DiskObjectStorage.h>
#include <Interpreters/Context.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Databases/DataLake/DatabaseDataLake.h>
#include <Core/Settings.h>

#include <fmt/ranges.h>
Expand Down Expand Up @@ -348,46 +348,22 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
catalog);
}

std::shared_ptr<DataLake::ICatalog> getCatalog([[maybe_unused]] ContextPtr context, [[maybe_unused]] bool is_attach) const override
std::shared_ptr<DataLake::ICatalog> getCatalog([[maybe_unused]] ContextPtr context, [[maybe_unused]] const StorageID & table_id) const override
{
#if USE_AWS_S3 && USE_AVRO
if ((*settings)[DataLakeStorageSetting::storage_catalog_type].value == DatabaseDataLakeCatalogType::GLUE)
{
auto catalog_parameters = DataLake::CatalogSettings{
.storage_endpoint = (*settings)[DataLakeStorageSetting::object_storage_endpoint].value,
.aws_access_key_id = (*settings)[DataLakeStorageSetting::storage_aws_access_key_id].value,
.aws_secret_access_key = (*settings)[DataLakeStorageSetting::storage_aws_secret_access_key].value,
.region = (*settings)[DataLakeStorageSetting::storage_region].value,
.namespaces = catalog_namespaces,
.aws_role_arn = (*settings)[DataLakeStorageSetting::storage_aws_role_arn].value,
.aws_role_session_name = (*settings)[DataLakeStorageSetting::storage_aws_role_session_name].value
};

return std::make_shared<DataLake::GlueCatalog>(
(*settings)[DataLakeStorageSetting::storage_catalog_url].value,
context,
catalog_parameters,
/* table_engine_definition */nullptr
);
}
/// Attach condition is provided for compatibility.
if ((*settings)[DataLakeStorageSetting::storage_catalog_type].value == DatabaseDataLakeCatalogType::ICEBERG_REST ||
(is_attach && (*settings)[DataLakeStorageSetting::storage_catalog_type].value == DatabaseDataLakeCatalogType::NONE && !(*settings)[DataLakeStorageSetting::storage_catalog_url].value.empty()))
{
return std::make_shared<DataLake::RestCatalog>(
(*settings)[DataLakeStorageSetting::storage_warehouse].value,
(*settings)[DataLakeStorageSetting::storage_catalog_url].value,
(*settings)[DataLakeStorageSetting::storage_catalog_credential].value,
(*settings)[DataLakeStorageSetting::storage_auth_scope].value,
(*settings)[DataLakeStorageSetting::storage_auth_header],
(*settings)[DataLakeStorageSetting::storage_oauth_server_uri].value,
(*settings)[DataLakeStorageSetting::storage_oauth_server_use_request_body].value,
catalog_namespaces,
context);
}

#endif
#if USE_AVRO && USE_PARQUET
if ((*settings)[DataLakeStorageSetting::storage_catalog_type].changed || (*settings)[DataLakeStorageSetting::storage_aws_access_key_id].changed)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Don't use deprecated settings storage_catalog_type and storage_catalog_url");
const String db_name = table_id.hasDatabase() ? table_id.database_name : context->getCurrentDatabase();
DatabasePtr database = DatabaseCatalog::instance().tryGetDatabase(db_name);
if (!database)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Database {} not found", db_name);
auto datalake_database = std::dynamic_pointer_cast<DatabaseDataLake>(database);
if (!datalake_database)
return nullptr;
return datalake_database->getCatalog();
#else
return nullptr;
#endif
}

bool optimize(const StorageMetadataPtr & metadata_snapshot, ContextPtr context, const std::optional<FormatSettings> & format_settings) override
Expand Down Expand Up @@ -830,8 +806,8 @@ class StorageIcebergConfiguration : public StorageObjectStorageConfiguration, pu
ColumnMapperPtr getColumnMapperForCurrentSchema(StorageMetadataPtr storage_metadata_snapshot, ContextPtr context) const override
{ return getImpl().getColumnMapperForCurrentSchema(storage_metadata_snapshot, context); }

std::shared_ptr<DataLake::ICatalog> getCatalog(ContextPtr context, bool is_attach) const override
{ return getImpl().getCatalog(context, is_attach); }
std::shared_ptr<DataLake::ICatalog> getCatalog(ContextPtr context, const StorageID & table_id) const override
{ return getImpl().getCatalog(context, table_id); }

bool optimize(const StorageMetadataPtr & metadata_snapshot, ContextPtr context, const std::optional<FormatSettings> & format_settings) override
{ return getImpl().optimize(metadata_snapshot, context, format_settings); }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,10 @@ class StorageObjectStorageConfiguration
virtual ColumnMapperPtr getColumnMapperForCurrentSchema(StorageMetadataPtr /**/, ContextPtr /**/) const { return nullptr; }


virtual std::shared_ptr<DataLake::ICatalog> getCatalog(ContextPtr /*context*/, bool /*is_attach*/) const { return nullptr; }
virtual std::shared_ptr<DataLake::ICatalog> getCatalog(ContextPtr /*context*/, const StorageID & /*table_id*/) const
{
return nullptr;
}

virtual bool optimize(const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr /*context*/, const std::optional<FormatSettings> & /*format_settings*/)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ createStorageObjectStorage(const StorageFactory::Arguments & args, StorageObject
args.comment,
format_settings,
args.mode,
configuration->getCatalog(context, args.query.attach),
configuration->getCatalog(context, args.table_id),
args.query.if_not_exists,
/* is_datalake_query */ false,
/* is_table_function */ false,
Expand Down
2 changes: 1 addition & 1 deletion src/TableFunctions/TableFunctionObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ StoragePtr TableFunctionObjectStorage<Definition, Configuration, is_data_lake>::
/* comment */ String{},
/* format_settings */ std::nullopt, /// No format_settings
/* mode */ LoadingStrictnessLevel::CREATE,
configuration->getCatalog(context, /* attach */ false),
configuration->getCatalog(context, StorageID(getDatabaseName(), table_name)),
/* if_not_exists */ false,
/* is_datalake_query*/ false,
/* is_table_function */ true);
Expand Down
28 changes: 12 additions & 16 deletions tests/integration/test_database_glue/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,33 +210,29 @@ def create_clickhouse_glue_database(
node.query(
f"""
DROP DATABASE IF EXISTS {name};
SET allow_database_glue_catalog=true;
SET write_full_path_in_iceberg_metadata=true;
CREATE DATABASE {name} ENGINE = DataLakeCatalog('{BASE_URL}'{credential_args})
SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))}
"""
""",
settings={
"allow_database_glue_catalog": 1,
"write_full_path_in_iceberg_metadata": 1,
},
)

def create_clickhouse_glue_table(
started_cluster, node, database_name, table_name, schema, additional_settings={}
):
settings = {
"storage_catalog_type": "glue",
"storage_warehouse": "test",
"object_storage_endpoint": "http://minio:9000/warehouse-glue",
"storage_region": "us-east-1",
"storage_catalog_url" : BASE_URL
}

settings.update(additional_settings)
settings_suffix = "" if len(additional_settings) == 0 else f"SETTINGS {",".join((k+"="+repr(v) for k, v in additional_settings.items()))}"

node.query(
f"""
SET allow_experimental_database_glue_catalog=true;
SET write_full_path_in_iceberg_metadata=true;
CREATE TABLE {CATALOG_NAME}.`{database_name}.{table_name}` {schema} ENGINE = IcebergS3('http://minio:9000/warehouse-glue/{table_name}/', '{minio_access_key}', '{minio_secret_key}')
SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))}
"""
{settings_suffix}
""",
settings={
"allow_experimental_database_glue_catalog": 1,
"write_full_path_in_iceberg_metadata": 1,
},
)

show_result = node.query(f"SHOW DATABASE {CATALOG_NAME}")
Expand Down
47 changes: 20 additions & 27 deletions tests/integration/test_database_iceberg/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,13 @@ def create_clickhouse_iceberg_database(
node.query(
f"""
DROP DATABASE IF EXISTS {name};
SET allow_database_iceberg=true;
SET write_full_path_in_iceberg_metadata=1;
CREATE DATABASE {name} ENGINE = {engine}('{BASE_URL}', 'minio', '{minio_secret_key}')
SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))}
"""
""",
settings={
"allow_database_iceberg": 1,
"write_full_path_in_iceberg_metadata": 1,
},
)
show_result = node.query(f"SHOW DATABASE {name}")
assert minio_secret_key not in show_result
Expand All @@ -154,23 +156,16 @@ def create_clickhouse_iceberg_database(
def create_clickhouse_iceberg_table(
started_cluster, node, database_name, table_name, schema, additional_settings={}
):
settings = {
"storage_catalog_type": "rest",
"storage_warehouse": "demo",
"object_storage_endpoint": "http://minio:9000/warehouse-rest",
"storage_region": "us-east-1",
"storage_catalog_url" : BASE_URL,
}

settings.update(additional_settings)

settings_suffix = "" if len(additional_settings) == 0 else f"SETTINGS {",".join((k+"="+repr(v) for k, v in additional_settings.items()))}"
node.query(
f"""
SET allow_experimental_database_iceberg=true;
SET write_full_path_in_iceberg_metadata=1;
CREATE TABLE {CATALOG_NAME}.`{database_name}.{table_name}` {schema} ENGINE = IcebergS3('http://minio:9000/warehouse-rest/{table_name}/', '{minio_access_key}', '{minio_secret_key}')
SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))}
"""
{settings_suffix}
""",
settings={
"allow_experimental_database_iceberg": 1,
"write_full_path_in_iceberg_metadata": 1,
},
)

def drop_clickhouse_iceberg_table(
Expand Down Expand Up @@ -730,11 +725,13 @@ def test_not_specified_catalog_type(started_cluster):
node.query(
f"""
DROP DATABASE IF EXISTS {CATALOG_NAME};
SET allow_database_iceberg=true;
SET write_full_path_in_iceberg_metadata=1;
CREATE DATABASE {CATALOG_NAME} ENGINE = DataLakeCatalog('{BASE_URL}', 'minio', '{minio_secret_key}')
SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))}
"""
""",
settings={
"allow_database_iceberg": 1,
"write_full_path_in_iceberg_metadata": 1,
},
)
assert "" == node.query(f"SHOW TABLES FROM {CATALOG_NAME}")

Expand Down Expand Up @@ -827,12 +824,7 @@ def test_gcs(started_cluster):
node = started_cluster.instances["node1"]

node.query("SYSTEM ENABLE FAILPOINT database_iceberg_gcs")
node.query(
f"""
DROP DATABASE IF EXISTS {CATALOG_NAME};
SET allow_database_iceberg = 1;
"""
)
node.query(f"DROP DATABASE IF EXISTS {CATALOG_NAME};")

with pytest.raises(Exception) as err:
node.query(
Expand All @@ -842,7 +834,8 @@ def test_gcs(started_cluster):
SETTINGS
catalog_type = 'rest',
warehouse = 'demo',
"""
""",
settings={"allow_database_iceberg": 1},
)
assert "Google cloud storage converts to S3" in str(err.value)

Expand Down
Loading