Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ci/docker/integration/runner/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ org.apache.hudi:hudi-spark3.5-bundle_2.12:1.0.1,\
org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.3,\
org.apache.hadoop:hadoop-aws:3.3.4,\
com.amazonaws:aws-java-sdk-bundle:1.12.262,\
org.apache.hadoop:hadoop-azure:3.3.4,\
com.microsoft.azure:azure-storage:8.6.6,\
org.apache.spark:spark-avro_2.12:3.5.1"\
&& /spark-3.5.5-bin-hadoop3/bin/spark-shell --packages "$packages" \
&& find /root/.ivy2/ -name '*.jar' -exec ln -sf {} /spark-3.5.5-bin-hadoop3/jars/ \;
Expand Down
82 changes: 82 additions & 0 deletions docs/en/sql-reference/table-functions/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,88 @@ GRANT ALTER TABLE ON my_iceberg_table TO my_user;
- The catalog's own authorization (REST catalog auth, AWS Glue IAM, etc.) is enforced independently when ClickHouse updates the metadata
:::

### Remove Orphan Files {#iceberg-remove-orphan-files}

Orphan files are files on storage that are not referenced by any snapshot in the Iceberg table metadata. They accumulate from failed writes, partial cleanup after compaction, and interrupted operations, causing unbounded storage growth. The `remove_orphan_files` command identifies and removes these orphan files.

**Syntax:**

```sql
-- Positional form: single unnamed older_than argument
ALTER TABLE iceberg_table EXECUTE remove_orphan_files('timestamp')

-- Named form
ALTER TABLE iceberg_table EXECUTE remove_orphan_files(
older_than = 'timestamp',
location = 'path',
dry_run = 0|1
)

-- No arguments: use all defaults (older_than = 3 days ago)
ALTER TABLE iceberg_table EXECUTE remove_orphan_files()
```

**Parameters:**

| Parameter | Type | Default | Description |
|---|---|---|---|
| `older_than` | `String` (timestamp) | 3 days ago (configurable via `iceberg_orphan_files_older_than_seconds`) | Only consider files with a last-modified time older than this timestamp as orphan candidates. Safety guard against deleting files from in-progress writes. |
| `location` | `String` | Table location | Restrict the scan to a specific subdirectory under the table location (e.g., `'data/'` or `'metadata/'`). |
| `dry_run` | `UInt64` | `0` | When `1`, identify orphan files and return the result summary without actually deleting anything. |

**Examples:**

```sql
-- Remove orphan files older than a specific timestamp
ALTER TABLE iceberg_table EXECUTE remove_orphan_files('2026-03-01 00:00:00');

-- Dry run: preview which files would be deleted
ALTER TABLE iceberg_table EXECUTE remove_orphan_files(dry_run = 1);

-- Scan only the data directory
ALTER TABLE iceberg_table EXECUTE remove_orphan_files(
older_than = '2026-03-01 00:00:00',
location = 'data/'
);

-- Combine positional older_than with named arguments
ALTER TABLE iceberg_table EXECUTE remove_orphan_files(
'2026-03-01 00:00:00',
dry_run = 1
);
```

**Output:**

The command returns a table with `metric_name` and `metric_value` columns showing the count of deleted (or would-be-deleted in dry_run mode) files by category. File categories are classified using best-effort heuristics based on file naming conventions; files that do not match any specific pattern default to `deleted_data_files_count`:

| metric_name | metric_value |
|---|---|
| deleted_data_files_count | 5 |
| deleted_position_delete_files_count | 2 |
| deleted_equality_delete_files_count | 0 |
| deleted_manifest_files_count | 3 |
| deleted_manifest_lists_count | 1 |
| deleted_metadata_files_count | 0 |
| deleted_statistics_files_count | 0 |
| skipped_missing_metadata_count | 0 |
| failed_deletions_count | 0 |

**Settings:**

| Setting | Type | Default | Description |
|---|---|---|---|
| `allow_iceberg_remove_orphan_files` | `Bool` | `false` | Gate setting to enable the feature (experimental). |
| `iceberg_orphan_files_older_than_seconds` | `UInt64` | `259200` (3 days) | Default `older_than` threshold in seconds when the argument is omitted. |

:::note
- **Requires Iceberg format version 2 (or higher).** Version 1 tables are rejected because they lack `manifest-list` pointers in snapshots, which are needed to safely determine the reachable file set. Running the command on a v1 table returns a `BAD_ARGUMENTS` error.
- Requires both `allow_insert_into_iceberg` and `allow_iceberg_remove_orphan_files` settings to be enabled
- It is recommended to run `expire_snapshots` before `remove_orphan_files` so that files uniquely referenced by expired snapshots are cleaned up first
- Use `dry_run = 1` to preview orphan files before deletion
- The `older_than` threshold protects against deleting files from in-progress writes — the default 3-day threshold provides a generous safety margin
:::

## Altinity Antalya branch

### Specify storage type in arguments
Expand Down
6 changes: 6 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7754,6 +7754,12 @@ Allow to execute `insert` queries into iceberg.
)", BETA, allow_experimental_insert_into_iceberg) \
DECLARE(Bool, allow_experimental_iceberg_compaction, false, R"(
Allow to explicitly use 'OPTIMIZE' for iceberg tables.
)", EXPERIMENTAL) \
DECLARE(Bool, allow_iceberg_remove_orphan_files, false, R"(
Allow to use 'ALTER TABLE ... EXECUTE remove_orphan_files()' for iceberg tables.
)", EXPERIMENTAL) \
DECLARE(UInt64, iceberg_orphan_files_older_than_seconds, 259200, R"(
Default age threshold in seconds for orphan file removal in Iceberg tables. Files newer than this are not considered orphans. Used when the older_than argument is omitted from the remove_orphan_files() procedure call. Default is 259200 (3 days).
)", EXPERIMENTAL) \
DECLARE(Bool, allow_experimental_expire_snapshots, false, R"(
Allow to execute experimental Iceberg command `ALTER TABLE ... EXECUTE expire_snapshots`.
Expand Down
24 changes: 24 additions & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,30 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
addSettingsChanges(settings_changes_history, "26.3.1.20001.altinityantalya",
{
{"object_storage_cluster_join_mode", "allow", "allow", "New setting"},
{"allow_iceberg_remove_orphan_files", false, false, "New setting to gate Iceberg orphan file removal"},
{"iceberg_orphan_files_older_than_seconds", 259200, 259200, "New setting for default orphan file age threshold"},
{"output_format_arrow_unsupported_types_as_binary", false, true, "New setting to convert unsupported CH types to arrow binary instead of UNKNOWN_TYPE exception."},
{"output_format_parquet_unsupported_types_as_binary", false, false, "New setting to convert unsupported CH types to parquet (arrow) binary instead of UNKNOWN_TYPE exception."},
{"asterisk_include_virtual_columns", false, false, "New setting"},
{"max_wkb_geometry_elements", 1'000'000, 1'000'000, "New setting to limit element counts in WKB geometry parsing, preventing excessive memory allocation on malformed data."},
{"max_rand_distribution_trials", 1'000'000'000, 1'000'000'000, "New setting to limit trial counts in random distribution functions, preventing hangs with extreme inputs."},
{"max_rand_distribution_parameter", 1e6, 1e6, "New setting to limit shape parameters in random distribution functions, preventing hangs with extreme inputs."},
{"optimize_truncate_order_by_after_group_by_keys", false, true, "Remove trailing ORDER BY elements once all GROUP BY keys are covered in the ORDER BY prefix."},
{"use_statistics_for_part_pruning", false, true, "New setting to use statistics for part pruning during query execution."},
{"distributed_index_analysis_only_on_coordinator", false, false, "New setting."},
{"query_plan_optimize_join_order_randomize", 0, 0, "New setting to randomize join order statistics for testing."},
{"enable_materialized_cte", false, false, "New setting"},
{"use_strict_insert_block_limits", false, false, "New setting to use strict min and max insert bounds on inserts. When min < max, max limits take precedence."},
{"finalize_projection_parts_synchronously", false, false, "New setting to finalize projection parts synchronously during INSERT to reduce peak memory usage."},
{"read_in_order_use_virtual_row_per_block", false, false, "Emit virtual row after each block during read-in-order to allow more frequent source reprioritization in MergingSortedTransform."},
{"distributed_plan_prefer_replicas_over_workers", false, false, "New setting to serialize distributed plan for replicas"},
{"use_text_index_like_evaluation_by_dictionary_scan", true, true, "New setting"},
{"text_index_like_min_pattern_length", 4, 4, "New setting"},
{"text_index_like_max_postings_to_read", 50, 50, "New setting"},
{"analyzer_inline_views", false, false, "New setting"},
{"highlight_max_matches_per_row", 10000, 10000, "New setting to limit the number of highlight matches per row to protect against excessive memory usage."},
{"materialize_statistics_on_insert", true, false, "Disable building statistics on INSERT by default, rely on merges instead"},
{"enable_join_transitive_predicates", false, false, "New setting to infer transitive equi-join predicates for join order optimization."},
});
addSettingsChanges(settings_changes_history, "26.3",
{
Expand Down
28 changes: 11 additions & 17 deletions src/Databases/DataLake/GlueCatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,20 @@
#include <DataTypes/DataTypesNumber.h>


#include <IO/S3/Credentials.h>
#include <IO/S3/Client.h>
#include <IO/S3Settings.h>
#include <Databases/DataLake/Common.h>
#include <IO/CompressionMethod.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <Common/ProxyConfigurationResolverProvider.h>
#include <Databases/DataLake/Common.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/Utils.h>
#include <Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.h>
#include <Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h>
#include <IO/S3/Client.h>
#include <IO/S3/Credentials.h>
#include <IO/S3Settings.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
#include <Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h>
#include <Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/Utils.h>
#include <Common/ProxyConfigurationResolverProvider.h>

namespace DB::ErrorCodes
{
Expand Down Expand Up @@ -554,14 +555,7 @@ String GlueCatalog::resolveMetadataPathFromTableLocation(const String & table_lo
try
{
auto [metadata_version, metadata_path, compression_method] = DB::Iceberg::getLatestOrExplicitMetadataFileAndVersion(
object_storage,
table_path,
*storage_settings,
nullptr,
getContext(),
log.get(),
std::nullopt
);
object_storage, table_path, *storage_settings, nullptr, getContext(), log.get(), std::nullopt, DB::CompressionMethod::None);

LOG_TRACE(log, "Resolved metadata path '{}' (version {}) for table location '{}'", metadata_path, metadata_version, table_location);

Expand Down
1 change: 1 addition & 0 deletions src/Databases/enableAllExperimentalSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ void enableAllExperimentalSettings(ContextMutablePtr context)
context->setSetting("allow_experimental_lightweight_update", 1);
context->setSetting("allow_insert_into_iceberg", 1);
context->setSetting("allow_experimental_iceberg_compaction", 1);
context->setSetting("allow_iceberg_remove_orphan_files", 1);
context->setSetting("allow_experimental_expire_snapshots", 1);
context->setSetting("allow_experimental_delta_lake_writes", 1);
context->setSetting("allow_dynamic_type_in_join_keys", 1);
Expand Down
2 changes: 1 addition & 1 deletion src/Disks/IO/ReadBufferFromAzureBlobStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ size_t ReadBufferFromAzureBlobStorage::readBigAt(char * to, size_t n, size_t ran

ObjectMetadata ReadBufferFromAzureBlobStorage::getObjectMetadataFromTheLastRequest() const
{
if (last_object_metadata.get()->has_value())
if (!last_object_metadata.get()->has_value())
throw Exception(ErrorCodes::NOT_INITIALIZED, "No Azure object metadata available because there were no successful requests");

return last_object_metadata.get()->value();
Expand Down
4 changes: 2 additions & 2 deletions src/Interpreters/IcebergMetadataLog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ void insertRowToLogTable(
std::function<String()> get_row,
IcebergMetadataLogLevel row_log_level,
const String & table_path,
const String & file_path,
const Iceberg::IcebergPathFromMetadata & file_path,
std::optional<UInt64> row_in_file,
std::optional<Iceberg::PruningReturnStatus> pruning_status)
{
Expand All @@ -108,7 +108,7 @@ void insertRowToLogTable(
.query_id = local_context->getCurrentQueryId(),
.content_type = row_log_level,
.table_path = table_path,
.file_path = file_path,
.file_path = file_path.serialize(),
.metadata_content = get_row(),
.row_in_file = row_in_file,
.pruning_status = pruning_status});
Expand Down
3 changes: 2 additions & 1 deletion src/Interpreters/IcebergMetadataLog.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <Core/SettingsEnums.h>
#include <Interpreters/SystemLog.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergPath.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/ManifestFilesPruning.h>

namespace DB
Expand Down Expand Up @@ -33,7 +34,7 @@ void insertRowToLogTable(
std::function<String()> get_row,
IcebergMetadataLogLevel row_log_level,
const String & table_path,
const String & file_path,
const Iceberg::IcebergPathFromMetadata & file_path,
std::optional<UInt64> row_in_file,
std::optional<Iceberg::PruningReturnStatus> pruning_status);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <IO/WriteBufferFromString.h>
#include <Storages/ObjectStorage/DataLakes/Common/AvroForIcebergDeserializer.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/Constant.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergPath.h>
#include <Poco/JSON/Array.h>
#include <Poco/JSON/Object.h>
#include <Common/UniqueLock.h>
Expand Down Expand Up @@ -35,7 +36,9 @@ namespace DB::Iceberg
using namespace DB;

AvroForIcebergDeserializer::AvroForIcebergDeserializer(
std::unique_ptr<ReadBufferFromFileBase> buffer_, const std::string & manifest_file_path_, const DB::FormatSettings & format_settings)
std::unique_ptr<ReadBufferFromFileBase> buffer_,
const IcebergPathFromMetadata & manifest_file_path_,
const DB::FormatSettings & format_settings)
try
: buffer(std::move(buffer_))
, manifest_file_path(manifest_file_path_)
Expand Down Expand Up @@ -156,7 +159,8 @@ ParsedManifestFileEntryPtr AvroForIcebergDeserializer::createParsedManifestFileE
}


const auto file_path_key = getValueFromRowByName(row_index, c_data_file_file_path, TypeIndex::String).safeGet<String>();
const auto file_path_key = IcebergPathFromMetadata::deserialize(
getValueFromRowByName(row_index, c_data_file_file_path, TypeIndex::String).safeGet<String>());
/// NOTE: This is weird, because in manifest file partition looks like this:
/// {
/// ...
Expand Down Expand Up @@ -257,16 +261,18 @@ ParsedManifestFileEntryPtr AvroForIcebergDeserializer::createParsedManifestFileE
}
case FileContentType::POSITION_DELETE: {
/// reference_file_path can be absent in schema for some reason, though it is present in specification: https://iceberg.apache.org/spec/#manifests
std::optional<String> lower_reference_data_file_path = std::nullopt;
std::optional<String> upper_reference_data_file_path = std::nullopt;
std::optional<Iceberg::IcebergPathFromMetadata> lower_reference_data_file_path;
std::optional<Iceberg::IcebergPathFromMetadata> upper_reference_data_file_path;
bool bounds_set_by_referenced_data_file = false;
if (hasPath(c_data_file_referenced_data_file))
{
Field reference_file_path_field = getValueFromRowByName(row_index, c_data_file_referenced_data_file);
if (!reference_file_path_field.isNull())
{
lower_reference_data_file_path = reference_file_path_field.safeGet<String>();
upper_reference_data_file_path = reference_file_path_field.safeGet<String>();
lower_reference_data_file_path.emplace(
Iceberg::IcebergPathFromMetadata::deserialize(reference_file_path_field.safeGet<String>()));
upper_reference_data_file_path.emplace(
Iceberg::IcebergPathFromMetadata::deserialize(reference_file_path_field.safeGet<String>()));
bounds_set_by_referenced_data_file = true;
}
}
Expand All @@ -277,9 +283,9 @@ ParsedManifestFileEntryPtr AvroForIcebergDeserializer::createParsedManifestFileE
{
auto & [lower, upper] = it->second;
if (!lower.isNull())
lower_reference_data_file_path = lower.safeGet<String>();
lower_reference_data_file_path.emplace(Iceberg::IcebergPathFromMetadata::deserialize(lower.safeGet<String>()));
if (!upper.isNull())
upper_reference_data_file_path = upper.safeGet<String>();
upper_reference_data_file_path.emplace(Iceberg::IcebergPathFromMetadata::deserialize(upper.safeGet<String>()));
}
}
return std::make_shared<const ParsedManifestFileEntry>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <Core/Field.h>
#include <DataTypes/DataTypeTuple.h>
#include <IO/ReadBufferFromFileBase.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergPath.h>
#include <Common/SharedMutex.h>


Expand Down Expand Up @@ -36,7 +37,7 @@ class AvroForIcebergDeserializer
{
private:
std::unique_ptr<DB::ReadBufferFromFileBase> buffer;
std::string manifest_file_path;
Iceberg::IcebergPathFromMetadata manifest_file_path;
DB::ColumnPtr parsed_column;
std::shared_ptr<const DB::DataTypeTuple> parsed_column_data_type;
mutable std::optional<ColumnsWithTypeAndName> cache_parsed_columns TSA_GUARDED_BY(cache_mutex);
Expand All @@ -61,7 +62,7 @@ class AvroForIcebergDeserializer
public:
AvroForIcebergDeserializer(
std::unique_ptr<DB::ReadBufferFromFileBase> buffer_,
const std::string & manifest_file_path_,
const Iceberg::IcebergPathFromMetadata & manifest_file_path_,
const DB::FormatSettings & format_settings);

size_t rows() const;
Expand Down
Loading
Loading