diff --git a/docs/en/sql-reference/table-functions/iceberg.md b/docs/en/sql-reference/table-functions/iceberg.md index c0c27b384429..c92de0311de7 100644 --- a/docs/en/sql-reference/table-functions/iceberg.md +++ b/docs/en/sql-reference/table-functions/iceberg.md @@ -88,12 +88,6 @@ ENGINE = IcebergS3( 'minio_access_key', 'minio_secret_key' ) -SETTINGS - storage_catalog_type="rest", - storage_warehouse="demo", - object_storage_endpoint="http://minio:9000/warehouse-rest", - storage_region="us-east-1", - storage_catalog_url="http://rest:8181/v1" ``` Or, using AWS Glue Data Catalog with S3: @@ -104,12 +98,6 @@ ENGINE = IcebergS3( 'aws_access_key', 'aws_secret_key' ) -SETTINGS - storage_catalog_type = 'glue', - storage_warehouse = 'my_database', - object_storage_endpoint = 's3://my-data-bucket/', - storage_region = 'us-east-1', - storage_catalog_url = 'https://glue.us-east-1.amazonaws.com/iceberg/v1' ``` ## Schema Evolution {#schema-evolution} diff --git a/src/Databases/DataLake/DatabaseDataLake.cpp b/src/Databases/DataLake/DatabaseDataLake.cpp index 6f2e6872df10..76298c317371 100644 --- a/src/Databases/DataLake/DatabaseDataLake.cpp +++ b/src/Databases/DataLake/DatabaseDataLake.cpp @@ -156,12 +156,12 @@ void DatabaseDataLake::validateSettings() std::shared_ptr DatabaseDataLake::getCatalog() const { - if (settings[DatabaseDataLakeSetting::catalog_type].value == DatabaseDataLakeCatalogType::NONE) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unspecified catalog type"); - if (catalog_impl) return catalog_impl; + if (settings[DatabaseDataLakeSetting::catalog_type].value == DatabaseDataLakeCatalogType::NONE) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unspecified catalog type"); + auto catalog_parameters = DataLake::CatalogSettings{ .storage_endpoint = settings[DatabaseDataLakeSetting::storage_endpoint].value, .aws_access_key_id = settings[DatabaseDataLakeSetting::aws_access_key_id].value, @@ -300,6 +300,7 @@ std::shared_ptr DatabaseDataLake::getCatalog() const break; } } + return catalog_impl; } diff --git a/src/Databases/DataLake/DatabaseDataLake.h b/src/Databases/DataLake/DatabaseDataLake.h index efaf9f9db316..5154b8225d18 100644 --- a/src/Databases/DataLake/DatabaseDataLake.h +++ b/src/Databases/DataLake/DatabaseDataLake.h @@ -63,6 +63,7 @@ class DatabaseDataLake final : public IDatabase, WithContext const String & name, bool /*sync*/) override; + std::shared_ptr getCatalog() const; protected: ASTPtr getCreateDatabaseQueryImpl() const override TSA_REQUIRES(mutex); ASTPtr getCreateTableQueryImpl(const String & table_name, ContextPtr context, bool throw_on_error) const override; @@ -82,7 +83,6 @@ class DatabaseDataLake final : public IDatabase, WithContext mutable std::shared_ptr catalog_impl; void validateSettings(); - std::shared_ptr getCatalog() const; StorageObjectStorageConfigurationPtr getConfiguration( DatabaseDataLakeStorageType type, diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h index 7ae6fdbd682b..74d7ceea7f93 100644 --- a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h @@ -39,12 +39,12 @@ #include #include #include -#include -#include #include #include #include #include +#include +#include #include #include @@ -348,46 +348,22 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl catalog); } - std::shared_ptr getCatalog([[maybe_unused]] ContextPtr context, [[maybe_unused]] bool is_attach) const override + std::shared_ptr getCatalog([[maybe_unused]] ContextPtr context, [[maybe_unused]] const StorageID & table_id) const override { -#if USE_AWS_S3 && USE_AVRO - if ((*settings)[DataLakeStorageSetting::storage_catalog_type].value == DatabaseDataLakeCatalogType::GLUE) - { - auto catalog_parameters = DataLake::CatalogSettings{ - .storage_endpoint = (*settings)[DataLakeStorageSetting::object_storage_endpoint].value, - .aws_access_key_id = (*settings)[DataLakeStorageSetting::storage_aws_access_key_id].value, - .aws_secret_access_key = (*settings)[DataLakeStorageSetting::storage_aws_secret_access_key].value, - .region = (*settings)[DataLakeStorageSetting::storage_region].value, - .namespaces = catalog_namespaces, - .aws_role_arn = (*settings)[DataLakeStorageSetting::storage_aws_role_arn].value, - .aws_role_session_name = (*settings)[DataLakeStorageSetting::storage_aws_role_session_name].value - }; - - return std::make_shared( - (*settings)[DataLakeStorageSetting::storage_catalog_url].value, - context, - catalog_parameters, - /* table_engine_definition */nullptr - ); - } - /// Attach condition is provided for compatibility. - if ((*settings)[DataLakeStorageSetting::storage_catalog_type].value == DatabaseDataLakeCatalogType::ICEBERG_REST || - (is_attach && (*settings)[DataLakeStorageSetting::storage_catalog_type].value == DatabaseDataLakeCatalogType::NONE && !(*settings)[DataLakeStorageSetting::storage_catalog_url].value.empty())) - { - return std::make_shared( - (*settings)[DataLakeStorageSetting::storage_warehouse].value, - (*settings)[DataLakeStorageSetting::storage_catalog_url].value, - (*settings)[DataLakeStorageSetting::storage_catalog_credential].value, - (*settings)[DataLakeStorageSetting::storage_auth_scope].value, - (*settings)[DataLakeStorageSetting::storage_auth_header], - (*settings)[DataLakeStorageSetting::storage_oauth_server_uri].value, - (*settings)[DataLakeStorageSetting::storage_oauth_server_use_request_body].value, - catalog_namespaces, - context); - } - -#endif +#if USE_AVRO && USE_PARQUET + if ((*settings)[DataLakeStorageSetting::storage_catalog_type].changed || (*settings)[DataLakeStorageSetting::storage_aws_access_key_id].changed) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Don't use deprecated settings storage_catalog_type and storage_catalog_url"); + const String db_name = table_id.hasDatabase() ? table_id.database_name : context->getCurrentDatabase(); + DatabasePtr database = DatabaseCatalog::instance().tryGetDatabase(db_name); + if (!database) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Database {} not found", db_name); + auto datalake_database = std::dynamic_pointer_cast(database); + if (!datalake_database) + return nullptr; + return datalake_database->getCatalog(); +#else return nullptr; +#endif } bool optimize(const StorageMetadataPtr & metadata_snapshot, ContextPtr context, const std::optional & format_settings) override @@ -830,8 +806,8 @@ class StorageIcebergConfiguration : public StorageObjectStorageConfiguration, pu ColumnMapperPtr getColumnMapperForCurrentSchema(StorageMetadataPtr storage_metadata_snapshot, ContextPtr context) const override { return getImpl().getColumnMapperForCurrentSchema(storage_metadata_snapshot, context); } - std::shared_ptr getCatalog(ContextPtr context, bool is_attach) const override - { return getImpl().getCatalog(context, is_attach); } + std::shared_ptr getCatalog(ContextPtr context, const StorageID & table_id) const override + { return getImpl().getCatalog(context, table_id); } bool optimize(const StorageMetadataPtr & metadata_snapshot, ContextPtr context, const std::optional & format_settings) override { return getImpl().optimize(metadata_snapshot, context, format_settings); } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h b/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h index 91cd56571906..0ce750c6193a 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h @@ -313,7 +313,10 @@ class StorageObjectStorageConfiguration virtual ColumnMapperPtr getColumnMapperForCurrentSchema(StorageMetadataPtr /**/, ContextPtr /**/) const { return nullptr; } - virtual std::shared_ptr getCatalog(ContextPtr /*context*/, bool /*is_attach*/) const { return nullptr; } + virtual std::shared_ptr getCatalog(ContextPtr /*context*/, const StorageID & /*table_id*/) const + { + return nullptr; + } virtual bool optimize(const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr /*context*/, const std::optional & /*format_settings*/) { diff --git a/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp b/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp index 0861ac676b4d..96bdbdb48a03 100644 --- a/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp @@ -102,7 +102,7 @@ createStorageObjectStorage(const StorageFactory::Arguments & args, StorageObject args.comment, format_settings, args.mode, - configuration->getCatalog(context, args.query.attach), + configuration->getCatalog(context, args.table_id), args.query.if_not_exists, /* is_datalake_query */ false, /* is_table_function */ false, diff --git a/src/TableFunctions/TableFunctionObjectStorage.cpp b/src/TableFunctions/TableFunctionObjectStorage.cpp index 0b9a976efa6b..adad1933712d 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.cpp +++ b/src/TableFunctions/TableFunctionObjectStorage.cpp @@ -267,7 +267,7 @@ StoragePtr TableFunctionObjectStorage:: /* comment */ String{}, /* format_settings */ std::nullopt, /// No format_settings /* mode */ LoadingStrictnessLevel::CREATE, - configuration->getCatalog(context, /* attach */ false), + configuration->getCatalog(context, StorageID(getDatabaseName(), table_name)), /* if_not_exists */ false, /* is_datalake_query*/ false, /* is_table_function */ true); diff --git a/tests/integration/test_database_glue/test.py b/tests/integration/test_database_glue/test.py index eece311d7fe8..4738a3ac6977 100644 --- a/tests/integration/test_database_glue/test.py +++ b/tests/integration/test_database_glue/test.py @@ -210,33 +210,29 @@ def create_clickhouse_glue_database( node.query( f""" DROP DATABASE IF EXISTS {name}; -SET allow_database_glue_catalog=true; -SET write_full_path_in_iceberg_metadata=true; CREATE DATABASE {name} ENGINE = DataLakeCatalog('{BASE_URL}'{credential_args}) SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))} - """ + """, + settings={ + "allow_database_glue_catalog": 1, + "write_full_path_in_iceberg_metadata": 1, + }, ) def create_clickhouse_glue_table( started_cluster, node, database_name, table_name, schema, additional_settings={} ): - settings = { - "storage_catalog_type": "glue", - "storage_warehouse": "test", - "object_storage_endpoint": "http://minio:9000/warehouse-glue", - "storage_region": "us-east-1", - "storage_catalog_url" : BASE_URL - } - - settings.update(additional_settings) + settings_suffix = "" if len(additional_settings) == 0 else f"SETTINGS {",".join((k+"="+repr(v) for k, v in additional_settings.items()))}" node.query( f""" -SET allow_experimental_database_glue_catalog=true; -SET write_full_path_in_iceberg_metadata=true; CREATE TABLE {CATALOG_NAME}.`{database_name}.{table_name}` {schema} ENGINE = IcebergS3('http://minio:9000/warehouse-glue/{table_name}/', '{minio_access_key}', '{minio_secret_key}') -SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))} - """ +{settings_suffix} +""", + settings={ + "allow_experimental_database_glue_catalog": 1, + "write_full_path_in_iceberg_metadata": 1, + }, ) show_result = node.query(f"SHOW DATABASE {CATALOG_NAME}") diff --git a/tests/integration/test_database_iceberg/test.py b/tests/integration/test_database_iceberg/test.py index 167b7d22708c..6b80a898d61c 100644 --- a/tests/integration/test_database_iceberg/test.py +++ b/tests/integration/test_database_iceberg/test.py @@ -141,11 +141,13 @@ def create_clickhouse_iceberg_database( node.query( f""" DROP DATABASE IF EXISTS {name}; -SET allow_database_iceberg=true; -SET write_full_path_in_iceberg_metadata=1; CREATE DATABASE {name} ENGINE = {engine}('{BASE_URL}', 'minio', '{minio_secret_key}') SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))} - """ + """, + settings={ + "allow_database_iceberg": 1, + "write_full_path_in_iceberg_metadata": 1, + }, ) show_result = node.query(f"SHOW DATABASE {name}") assert minio_secret_key not in show_result @@ -154,23 +156,16 @@ def create_clickhouse_iceberg_database( def create_clickhouse_iceberg_table( started_cluster, node, database_name, table_name, schema, additional_settings={} ): - settings = { - "storage_catalog_type": "rest", - "storage_warehouse": "demo", - "object_storage_endpoint": "http://minio:9000/warehouse-rest", - "storage_region": "us-east-1", - "storage_catalog_url" : BASE_URL, - } - - settings.update(additional_settings) - + settings_suffix = "" if len(additional_settings) == 0 else f"SETTINGS {",".join((k+"="+repr(v) for k, v in additional_settings.items()))}" node.query( f""" -SET allow_experimental_database_iceberg=true; -SET write_full_path_in_iceberg_metadata=1; CREATE TABLE {CATALOG_NAME}.`{database_name}.{table_name}` {schema} ENGINE = IcebergS3('http://minio:9000/warehouse-rest/{table_name}/', '{minio_access_key}', '{minio_secret_key}') -SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))} - """ +{settings_suffix} + """, + settings={ + "allow_experimental_database_iceberg": 1, + "write_full_path_in_iceberg_metadata": 1, + }, ) def drop_clickhouse_iceberg_table( @@ -730,11 +725,13 @@ def test_not_specified_catalog_type(started_cluster): node.query( f""" DROP DATABASE IF EXISTS {CATALOG_NAME}; - SET allow_database_iceberg=true; - SET write_full_path_in_iceberg_metadata=1; CREATE DATABASE {CATALOG_NAME} ENGINE = DataLakeCatalog('{BASE_URL}', 'minio', '{minio_secret_key}') SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))} - """ + """, + settings={ + "allow_database_iceberg": 1, + "write_full_path_in_iceberg_metadata": 1, + }, ) assert "" == node.query(f"SHOW TABLES FROM {CATALOG_NAME}") @@ -827,12 +824,7 @@ def test_gcs(started_cluster): node = started_cluster.instances["node1"] node.query("SYSTEM ENABLE FAILPOINT database_iceberg_gcs") - node.query( - f""" - DROP DATABASE IF EXISTS {CATALOG_NAME}; - SET allow_database_iceberg = 1; - """ - ) + node.query(f"DROP DATABASE IF EXISTS {CATALOG_NAME};") with pytest.raises(Exception) as err: node.query( @@ -842,7 +834,8 @@ def test_gcs(started_cluster): SETTINGS catalog_type = 'rest', warehouse = 'demo', - """ + """, + settings={"allow_database_iceberg": 1}, ) assert "Google cloud storage converts to S3" in str(err.value)