diff --git a/src/Databases/DataLake/RestCatalog.cpp b/src/Databases/DataLake/RestCatalog.cpp index a9c7fba4bcb5..8dad9b9003f5 100644 --- a/src/Databases/DataLake/RestCatalog.cpp +++ b/src/Databases/DataLake/RestCatalog.cpp @@ -938,9 +938,9 @@ bool RestCatalog::updateMetadata(const String & namespace_name, const String & t { sendRequest(endpoint, request_body); } - catch (const DB::HTTPException &) + catch (const DB::HTTPException & ex) { - return false; + throw DB::Exception(DB::ErrorCodes::DATALAKE_DATABASE_ERROR, "Failed to update metadata via REST: {}", ex.displayText()); } return true; } diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h index 8f3542165eb7..f58bf8d04dcd 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h @@ -144,6 +144,12 @@ class IDataLakeMetadata : boost::noncopyable virtual void modifyFormatSettings(FormatSettings &, const Context &) const {} + virtual bool supportsTruncate() const { return false; } + virtual void truncate(ContextPtr /*context*/, std::shared_ptr /*catalog*/, const StorageID & /*storage_id*/) + { + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncation is not supported by {} metadata", getName()); + } + static constexpr bool supportsTotalRows() { return false; } virtual std::optional totalRows(ContextPtr) const { return {}; } static constexpr bool supportsTotalBytes() { return false; } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Constant.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/Constant.h index 3bc4747a5f18..a09125cfbeb2 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Constant.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Constant.h @@ -111,7 +111,9 @@ DEFINE_ICEBERG_FIELD_ALIAS(partition_spec, partition-spec); DEFINE_ICEBERG_FIELD_ALIAS(partition_specs, partition-specs); DEFINE_ICEBERG_FIELD_ALIAS(spec_id, spec-id); DEFINE_ICEBERG_FIELD_ALIAS(added_records, added-records); +DEFINE_ICEBERG_FIELD_ALIAS(deleted_records, deleted-records); DEFINE_ICEBERG_FIELD_ALIAS(added_data_files, added-data-files); +DEFINE_ICEBERG_FIELD_ALIAS(deleted_data_files, deleted-data-files); DEFINE_ICEBERG_FIELD_ALIAS(added_delete_files, added-delete-files); DEFINE_ICEBERG_FIELD_ALIAS(added_position_delete_files, added-position-delete-files); DEFINE_ICEBERG_FIELD_ALIAS(added_position_deletes, added-position-deletes); diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index 58f2926cc07e..41c6b519ba49 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -9,9 +9,9 @@ #include #include #include -#include #include #include +#include #include #include #include @@ -100,6 +100,7 @@ extern const int NOT_IMPLEMENTED; extern const int ICEBERG_SPECIFICATION_VIOLATION; extern const int TABLE_ALREADY_EXISTS; extern const int SUPPORT_IS_DISABLED; +extern const int INCORRECT_DATA; } namespace Setting @@ -531,6 +532,97 @@ void IcebergMetadata::mutate( ); } +void IcebergMetadata::truncate(ContextPtr context, std::shared_ptr catalog, const StorageID & storage_id) +{ + if (!context->getSettingsRef()[Setting::allow_experimental_insert_into_iceberg].value) + throw Exception( + ErrorCodes::SUPPORT_IS_DISABLED, + "Iceberg truncate is experimental. " + "To allow its usage, enable setting allow_experimental_insert_into_iceberg"); + + // Bug 1 fix: REMOVE the isTransactional() guard entirely. + // REST/transactional catalogs are the primary target of this feature. + + auto [actual_data_snapshot, actual_table_state_snapshot] = getRelevantState(context); + auto metadata_object = getMetadataJSONObject( + actual_table_state_snapshot.metadata_file_path, + object_storage, + persistent_components.metadata_cache, + context, + log, + persistent_components.metadata_compression_method, + persistent_components.table_uuid); + + // Bug 4 fix: use -1 as the Iceberg "no parent" sentinel + Int64 parent_snapshot_id = actual_table_state_snapshot.snapshot_id.value_or(-1); + + auto config_path = persistent_components.table_path; + if (!config_path.starts_with('/')) config_path = '/' + config_path; + if (!config_path.ends_with('/')) config_path += "/"; + + // Bug 3 fix: restore isTransactional flag in FileNamesGenerator + bool is_transactional = (catalog != nullptr && catalog->isTransactional()); + FileNamesGenerator filename_generator; + // Transactional catalogs (REST) require full S3 URIs — force location-based path. + // Non-transactional respects the write_full_path_in_iceberg_metadata setting. + if (is_transactional || context->getSettingsRef()[Setting::write_full_path_in_iceberg_metadata]) + { + String location = metadata_object->getValue(Iceberg::f_location); + if (!location.ends_with("/")) location += "/"; + filename_generator = FileNamesGenerator( + location, config_path, is_transactional, + persistent_components.metadata_compression_method, write_format); + } + else + { + filename_generator = FileNamesGenerator( + config_path, config_path, false, + persistent_components.metadata_compression_method, write_format); + } + + Int32 new_metadata_version = actual_table_state_snapshot.metadata_version + 1; + filename_generator.setVersion(new_metadata_version); + + auto [metadata_name, storage_metadata_name] = filename_generator.generateMetadataName(); + + auto [new_snapshot, manifest_list_name, storage_manifest_list_name] = MetadataGenerator(metadata_object).generateNextMetadata( + filename_generator, metadata_name, parent_snapshot_id, + 0, 0, 0, 0, 0, 0, std::nullopt, std::nullopt, /*is_truncate=*/true); + + auto write_settings = context->getWriteSettings(); + auto buf = object_storage->writeObject( + StoredObject(storage_manifest_list_name), + WriteMode::Rewrite, std::nullopt, + DBMS_DEFAULT_BUFFER_SIZE, write_settings); + + generateManifestList(filename_generator, metadata_object, object_storage, context, + {}, new_snapshot, 0, *buf, Iceberg::FileContentType::DATA, /*use_previous_snapshots=*/false); + buf->finalize(); + + String metadata_content = dumpMetadataObjectToString(metadata_object); + writeMessageToFile(metadata_content, storage_metadata_name, object_storage, + context, "*", "", persistent_components.metadata_compression_method); + + // Bug 2 fix: restore the catalog commit, matching the pattern from IcebergWrites.cpp + if (catalog) + { + // Build the catalog-visible path (blob URI for transactional, bare path otherwise) + String catalog_filename = metadata_name; + if (is_transactional) + { + const String blob_storage_type_name = Poco::toLower(String(magic_enum::enum_name(object_storage->getType()))); + const auto blob_storage_namespace_name = persistent_components.table_path; + catalog_filename = blob_storage_type_name + "://" + blob_storage_namespace_name + "/" + metadata_name; + } + + const auto & [namespace_name, table_name] = DataLake::parseTableName(storage_id.getTableName()); + // Pass metadata_object (not new_snapshot) — matches the fix already applied in + // IcebergWrites.cpp and Mutations.cpp + if (!catalog->updateMetadata(namespace_name, table_name, catalog_filename, new_snapshot)) + throw Exception(ErrorCodes::INCORRECT_DATA, "Failed to commit Iceberg truncate update to catalog."); + } +} + void IcebergMetadata::checkMutationIsPossible(const MutationCommands & commands) { for (const auto & command : commands) diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h index dcb2b91131bd..9f1b0408d156 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h @@ -83,6 +83,9 @@ class IcebergMetadata : public IDataLakeMetadata bool supportsUpdate() const override { return true; } bool supportsWrites() const override { return true; } bool supportsParallelInsert() const override { return true; } + bool supportsTruncate() const override { return true; } + + void truncate(ContextPtr context, std::shared_ptr catalog, const StorageID & storage_id) override; IcebergHistory getHistory(ContextPtr local_context) const; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp index 3809dfcd38c2..b05bdd0254fa 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp @@ -451,6 +451,53 @@ void generateManifestList( else throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown iceberg version {}", version); + // For empty manifest list (e.g. TRUNCATE), write a valid Avro container + // file manually so we can embed the full schema JSON with field-ids intact, + // without triggering the DataFileWriter constructor's eager writeHeader() + // which commits encoder state before we can override avro.schema. + if (manifest_entry_names.empty() && !use_previous_snapshots) + { + auto write_avro_long = [](WriteBuffer & out, int64_t val) + { + uint64_t n = (static_cast(val) << 1) ^ static_cast(val >> 63); + while (n & ~0x7fULL) + { + char c = static_cast((n & 0x7f) | 0x80); + out.write(&c, 1); + n >>= 7; + } + char c = static_cast(n); + out.write(&c, 1); + }; + + auto write_avro_bytes = [&](WriteBuffer & out, const String & s) + { + write_avro_long(out, static_cast(s.size())); + out.write(s.data(), s.size()); + }; + + // Avro Object Container File header + buf.write("Obj\x01", 4); + + // Metadata map: 2 entries (codec + schema) + write_avro_long(buf, 2); + write_avro_bytes(buf, "avro.codec"); + write_avro_bytes(buf, "null"); + write_avro_bytes(buf, "avro.schema"); + write_avro_bytes(buf, schema_representation); // full JSON, field-ids intact + + // End of metadata map + write_avro_long(buf, 0); + + // Sync marker (16 zero bytes — valid, no data blocks follow) + static const char sync_marker[16] = {}; + buf.write(sync_marker, 16); + + // No data blocks for empty manifest list + buf.finalize(); + return; + } + auto schema = avro::compileJsonSchemaFromString(schema_representation); // NOLINT auto adapter = std::make_unique(buf); diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.cpp index c7bce897ea6c..184c6a7f9359 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.cpp @@ -113,7 +113,8 @@ MetadataGenerator::NextMetadataResult MetadataGenerator::generateNextMetadata( Int32 added_delete_files, Int32 num_deleted_rows, std::optional user_defined_snapshot_id, - std::optional user_defined_timestamp) + std::optional user_defined_timestamp, + bool is_truncate) { int format_version = metadata_object->getValue(Iceberg::f_format_version); Poco::JSON::Object::Ptr new_snapshot = new Poco::JSON::Object; @@ -137,7 +138,16 @@ MetadataGenerator::NextMetadataResult MetadataGenerator::generateNextMetadata( auto parent_snapshot = getParentSnapshot(parent_snapshot_id); Poco::JSON::Object::Ptr summary = new Poco::JSON::Object; - if (num_deleted_rows == 0) + if (is_truncate) + { + summary->set(Iceberg::f_operation, Iceberg::f_overwrite); + Int32 prev_total_records = parent_snapshot && parent_snapshot->has(Iceberg::f_summary) && parent_snapshot->getObject(Iceberg::f_summary)->has(Iceberg::f_total_records) ? std::stoi(parent_snapshot->getObject(Iceberg::f_summary)->getValue(Iceberg::f_total_records)) : 0; + Int32 prev_total_data_files = parent_snapshot && parent_snapshot->has(Iceberg::f_summary) && parent_snapshot->getObject(Iceberg::f_summary)->has(Iceberg::f_total_data_files) ? std::stoi(parent_snapshot->getObject(Iceberg::f_summary)->getValue(Iceberg::f_total_data_files)) : 0; + + summary->set(Iceberg::f_deleted_records, std::to_string(prev_total_records)); + summary->set(Iceberg::f_deleted_data_files, std::to_string(prev_total_data_files)); + } + else if (num_deleted_rows == 0) { summary->set(Iceberg::f_operation, Iceberg::f_append); summary->set(Iceberg::f_added_data_files, std::to_string(added_files)); @@ -157,7 +167,12 @@ MetadataGenerator::NextMetadataResult MetadataGenerator::generateNextMetadata( auto sum_with_parent_snapshot = [&](const char * field_name, Int32 snapshot_value) { - Int32 prev_value = parent_snapshot ? std::stoi(parent_snapshot->getObject(Iceberg::f_summary)->getValue(field_name)) : 0; + if (is_truncate) + { + summary->set(field_name, std::to_string(0)); + return; + } + Int32 prev_value = parent_snapshot && parent_snapshot->has(Iceberg::f_summary) && parent_snapshot->getObject(Iceberg::f_summary)->has(field_name) ? std::stoi(parent_snapshot->getObject(Iceberg::f_summary)->getValue(field_name)) : 0; summary->set(field_name, std::to_string(prev_value + snapshot_value)); }; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.h index a4cbbbc4434e..035747dafa14 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.h @@ -30,7 +30,8 @@ class MetadataGenerator Int32 added_delete_files, Int32 num_deleted_rows, std::optional user_defined_snapshot_id = std::nullopt, - std::optional user_defined_timestamp = std::nullopt); + std::optional user_defined_timestamp = std::nullopt, + bool is_truncate = false); void generateAddColumnMetadata(const String & column_name, DataTypePtr type); void generateDropColumnMetadata(const String & column_name); diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp index deeb05a49102..910ae11a9b7c 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp @@ -517,7 +517,7 @@ static bool writeMetadataFiles( catalog_filename = blob_storage_type_name + "://" + blob_storage_namespace_name + "/" + metadata_name; const auto & [namespace_name, table_name] = DataLake::parseTableName(table_id.getTableName()); - if (!catalog->updateMetadata(namespace_name, table_name, catalog_filename, new_snapshot)) + if (!catalog->updateMetadata(namespace_name, table_name, catalog_filename, metadata)) { cleanup(); return false; diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index f04be5d5f946..7fe3ce25debe 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -625,7 +625,7 @@ void StorageObjectStorage::commitExportPartitionTransaction(const String & trans void StorageObjectStorage::truncate( const ASTPtr & /* query */, const StorageMetadataPtr & /* metadata_snapshot */, - ContextPtr /* context */, + ContextPtr context, TableExclusiveLockHolder & /* table_holder */) { const auto path = configuration->getRawPath(); @@ -639,8 +639,12 @@ void StorageObjectStorage::truncate( if (configuration->isDataLakeConfiguration()) { - throw Exception(ErrorCodes::NOT_IMPLEMENTED, - "Truncate is not supported for data lake engine"); + auto * data_lake_metadata = getExternalMetadata(context); + if (!data_lake_metadata || !data_lake_metadata->supportsTruncate()) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncate is not supported for this data lake engine"); + + data_lake_metadata->truncate(context, catalog, getStorageID()); + return; } if (path.hasGlobs()) diff --git a/tests/integration/test_storage_iceberg_no_spark/test_iceberg_truncate.py b/tests/integration/test_storage_iceberg_no_spark/test_iceberg_truncate.py new file mode 100644 index 000000000000..40c5e8cff33f --- /dev/null +++ b/tests/integration/test_storage_iceberg_no_spark/test_iceberg_truncate.py @@ -0,0 +1,106 @@ +#!/usr/bin/env python3 + +from pyiceberg.catalog import load_catalog +from helpers.config_cluster import minio_secret_key, minio_access_key +import uuid +import pyarrow as pa +from pyiceberg.schema import Schema, NestedField +from pyiceberg.types import LongType, StringType +from pyiceberg.partitioning import PartitionSpec + +BASE_URL_LOCAL_RAW = "http://localhost:8182" +CATALOG_NAME = "demo" + +def load_catalog_impl(started_cluster): + return load_catalog( + CATALOG_NAME, + **{ + "uri": BASE_URL_LOCAL_RAW, + "type": "rest", + "s3.endpoint": f"http://{started_cluster.get_instance_ip('minio')}:9000", + "s3.access-key-id": minio_access_key, + "s3.secret-access-key": minio_secret_key, + }, + ) + + +def test_iceberg_truncate(started_cluster_iceberg_no_spark): + instance = started_cluster_iceberg_no_spark.instances["node1"] + catalog = load_catalog_impl(started_cluster_iceberg_no_spark) + + # 1. Setup PyIceberg Namespace and Table + namespace = f"clickhouse_truncate_{uuid.uuid4().hex}" + catalog.create_namespace(namespace) + + schema = Schema( + NestedField(field_id=1, name="id", field_type=LongType(), required=False), + NestedField(field_id=2, name="val", field_type=StringType(), required=False), + ) + + table_name = "test_truncate" + table = catalog.create_table( + identifier=f"{namespace}.{table_name}", + schema=schema, + location=f"s3://warehouse-rest/{namespace}.{table_name}", + partition_spec=PartitionSpec(), + ) + + # 2. Populate Data + df = pa.Table.from_pylist([ + {"id": 1, "val": "A"}, + {"id": 2, "val": "B"}, + {"id": 3, "val": "C"}, + ]) + table.append(df) + + # Validate data is in iceberg + assert len(table.scan().to_arrow()) == 3 + + # 3. Setup ClickHouse Database + instance.query(f"DROP DATABASE IF EXISTS {namespace}") + instance.query( + f""" + CREATE DATABASE {namespace} ENGINE = DataLakeCatalog('http://rest:8181/v1', 'minio', '{minio_secret_key}') + SETTINGS + catalog_type='rest', + warehouse='demo', + storage_endpoint='http://minio:9000/warehouse-rest'; + """, + settings={"allow_database_iceberg": 1} + ) + + # 4. Formulate the ClickHouse Table Identifier + # MUST wrap the inner table name in backticks so ClickHouse parses the Iceberg namespace correctly + ch_table_identifier = f"`{namespace}.{table_name}`" + + # Assert data from ClickHouse + assert int(instance.query(f"SELECT count() FROM {namespace}.{ch_table_identifier}").strip()) == 3 + + # 5. Truncate Table via ClickHouse + instance.query( + f"TRUNCATE TABLE {namespace}.{ch_table_identifier}", + settings={"allow_experimental_insert_into_iceberg": 1} + ) + + # Assert truncated from ClickHouse + assert int(instance.query(f"SELECT count() FROM {namespace}.{ch_table_identifier}").strip()) == 0 + + # 6. Cross-Engine Validation using PyIceberg + # Refresh table state to grab the new v.metadata.json you generated + table.refresh() + + # Assert PyIceberg reads the empty snapshot successfully + assert len(table.scan().to_arrow()) == 0 + + # 7. Verify Writable State + # Append a new row to ensure truncation didn't break table state + new_df = pa.Table.from_pylist([ + {"id": 4, "val": "D"} + ]) + table.append(new_df) + + # Assert new row count via ClickHouse + assert int(instance.query(f"SELECT count() FROM {namespace}.{ch_table_identifier}").strip()) == 1 + + # Cleanup + instance.query(f"DROP DATABASE {namespace}") \ No newline at end of file