diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 8828cf70f1df..bd3b4f688f18 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -40,6 +40,7 @@ namespace Setting extern const SettingsInt64 delta_lake_snapshot_end_version; extern const SettingsUInt64 lock_object_storage_task_distribution_ms; extern const SettingsBool allow_experimental_iceberg_read_optimization; + extern const SettingsBool object_storage_remote_initiator; } namespace ErrorCodes @@ -111,7 +112,9 @@ StorageObjectStorageCluster::StorageObjectStorageCluster( configuration->initPartitionStrategy(partition_by, columns_in_table_or_function_definition, context_); const bool need_resolve_columns_or_format = columns_in_table_or_function_definition.empty() || (configuration->getFormat() == "auto"); - const bool do_lazy_init = lazy_init && !need_resolve_columns_or_format && catalog; + const bool do_lazy_init = + (context_->getSettingsRef()[Setting::object_storage_remote_initiator] || lazy_init) + && !need_resolve_columns_or_format && catalog; auto log = getLogger("StorageObjectStorageCluster"); @@ -543,6 +546,9 @@ void StorageObjectStorageCluster::updateExternalDynamicMetadataIfExists(ContextP if (!configuration->isDataLakeConfiguration()) return; + if (query_context->getSettingsRef()[Setting::object_storage_remote_initiator]) + return; + /// Always force an update to pick up the latest snapshot version. /// Using if_not_updated_before=true would leave latest_snapshot_version /// stale from the first query and silently omit new files. diff --git a/tests/integration/test_storage_iceberg_with_spark/test_remote_initiator.py b/tests/integration/test_storage_iceberg_with_spark/test_remote_initiator.py new file mode 100644 index 000000000000..395a89336a81 --- /dev/null +++ b/tests/integration/test_storage_iceberg_with_spark/test_remote_initiator.py @@ -0,0 +1,80 @@ +import pytest + +from helpers.iceberg_utils import ( + get_uuid_str, + create_iceberg_table, + execute_spark_query_general, +) + + +@pytest.mark.parametrize("storage_type", ["s3"]) +def test_remote_initiator_with_changed_table(started_cluster_iceberg_with_spark, storage_type): + instance = started_cluster_iceberg_with_spark.instances["node1"] + spark = started_cluster_iceberg_with_spark.spark_session + TABLE_NAME = "test_remote_initiator_with_changed_table_" + get_uuid_str() + VIEW_NAME = TABLE_NAME + "_view" + + def execute_spark_query(query: str): + return execute_spark_query_general( + spark, + started_cluster_iceberg_with_spark, + storage_type, + TABLE_NAME, + query, + ) + + execute_spark_query( + f""" + CREATE TABLE {TABLE_NAME} ( + tag INT, + number INT + ) + USING iceberg + PARTITIONED BY (identity(tag)) + OPTIONS('format-version'='2') + """ + ) + + execute_spark_query( + f""" + INSERT INTO {TABLE_NAME} VALUES + (1, 1) + """ + ) + + create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster_iceberg_with_spark) + + res = instance.query(f""" + SELECT * + FROM {TABLE_NAME} + WHERE number=1 + SETTINGS + object_storage_remote_initiator=1, + object_storage_cluster='cluster_simple' + """) + + assert res == "1\t1\n" + + execute_spark_query( + f""" + ALTER TABLE {TABLE_NAME} ADD COLUMN number2 INT AFTER number; + """ + ) + + execute_spark_query( + f""" + INSERT INTO {TABLE_NAME} VALUES + (2, 2, 2) + """ + ) + + res = instance.query(f""" + SELECT * + FROM {TABLE_NAME} + WHERE number2=2 + SETTINGS + object_storage_remote_initiator=1, + object_storage_cluster='cluster_simple' + """) + + assert res == "2\t2\t2\n"