Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ namespace Setting
extern const SettingsInt64 delta_lake_snapshot_end_version;
extern const SettingsUInt64 lock_object_storage_task_distribution_ms;
extern const SettingsBool allow_experimental_iceberg_read_optimization;
extern const SettingsBool object_storage_remote_initiator;
}

namespace ErrorCodes
Expand Down Expand Up @@ -111,7 +112,9 @@ StorageObjectStorageCluster::StorageObjectStorageCluster(
configuration->initPartitionStrategy(partition_by, columns_in_table_or_function_definition, context_);

const bool need_resolve_columns_or_format = columns_in_table_or_function_definition.empty() || (configuration->getFormat() == "auto");
const bool do_lazy_init = lazy_init && !need_resolve_columns_or_format && catalog;
const bool do_lazy_init =
(context_->getSettingsRef()[Setting::object_storage_remote_initiator] || lazy_init)
&& !need_resolve_columns_or_format && catalog;

auto log = getLogger("StorageObjectStorageCluster");

Expand Down Expand Up @@ -543,6 +546,9 @@ void StorageObjectStorageCluster::updateExternalDynamicMetadataIfExists(ContextP
if (!configuration->isDataLakeConfiguration())
return;

if (query_context->getSettingsRef()[Setting::object_storage_remote_initiator])
return;
Comment on lines +549 to +550
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Refresh metadata even when using remote initiator

Returning early when object_storage_remote_initiator is enabled skips all dynamic metadata refresh on the initiating node, so its in-memory schema/state can become stale after Iceberg/DeltaLake snapshot changes. The initiator still performs query analysis and builds storage_snapshot locally (e.g., before rewriting to remote(...)), so a query that references newly added columns can fail at analysis with unknown-column errors even though the remote node has up-to-date metadata. Previously this path always called configuration->update, so this is a regression for sessions that keep object_storage_remote_initiator=1 across evolving lakehouse tables.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Author

@ianton-ru ianton-ru May 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's true. Add test (failed now), need to fix somehow.


/// Always force an update to pick up the latest snapshot version.
/// Using if_not_updated_before=true would leave latest_snapshot_version
/// stale from the first query and silently omit new files.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import pytest

from helpers.iceberg_utils import (
get_uuid_str,
create_iceberg_table,
execute_spark_query_general,
)


@pytest.mark.parametrize("storage_type", ["s3"])
def test_remote_initiator_with_changed_table(started_cluster_iceberg_with_spark, storage_type):
instance = started_cluster_iceberg_with_spark.instances["node1"]
spark = started_cluster_iceberg_with_spark.spark_session
TABLE_NAME = "test_remote_initiator_with_changed_table_" + get_uuid_str()
VIEW_NAME = TABLE_NAME + "_view"

def execute_spark_query(query: str):
return execute_spark_query_general(
spark,
started_cluster_iceberg_with_spark,
storage_type,
TABLE_NAME,
query,
)

execute_spark_query(
f"""
CREATE TABLE {TABLE_NAME} (
tag INT,
number INT
)
USING iceberg
PARTITIONED BY (identity(tag))
OPTIONS('format-version'='2')
"""
)

execute_spark_query(
f"""
INSERT INTO {TABLE_NAME} VALUES
(1, 1)
"""
)

create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster_iceberg_with_spark)

res = instance.query(f"""
SELECT *
FROM {TABLE_NAME}
WHERE number=1
SETTINGS
object_storage_remote_initiator=1,
object_storage_cluster='cluster_simple'
""")

assert res == "1\t1\n"

execute_spark_query(
f"""
ALTER TABLE {TABLE_NAME} ADD COLUMN number2 INT AFTER number;
"""
)

execute_spark_query(
f"""
INSERT INTO {TABLE_NAME} VALUES
(2, 2, 2)
"""
)

res = instance.query(f"""
SELECT *
FROM {TABLE_NAME}
WHERE number2=2
SETTINGS
object_storage_remote_initiator=1,
object_storage_cluster='cluster_simple'
""")

assert res == "2\t2\t2\n"
Loading