diff --git a/src/Common/FailPoint.cpp b/src/Common/FailPoint.cpp index cc86a875a39a..747440a6b3da 100644 --- a/src/Common/FailPoint.cpp +++ b/src/Common/FailPoint.cpp @@ -138,7 +138,8 @@ static struct InitFiu REGULAR(rmt_delay_execute_drop_range) \ REGULAR(rmt_delay_commit_part) \ ONCE(local_object_storage_network_error_during_remove) \ - ONCE(parallel_replicas_check_read_mode_always) + ONCE(parallel_replicas_check_read_mode_always)\ + REGULAR(lightweight_show_tables) namespace FailPoints { diff --git a/src/Databases/DataLake/DatabaseDataLake.cpp b/src/Databases/DataLake/DatabaseDataLake.cpp index b8a29232d7c9..85518b596ff5 100644 --- a/src/Databases/DataLake/DatabaseDataLake.cpp +++ b/src/Databases/DataLake/DatabaseDataLake.cpp @@ -41,6 +41,7 @@ #include #include #include +#include namespace DB { @@ -97,6 +98,11 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } +namespace FailPoints +{ + extern const char lightweight_show_tables[]; +} + DatabaseDataLake::DatabaseDataLake( const std::string & database_name_, const std::string & url_, @@ -448,6 +454,12 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con auto catalog = getCatalog(); auto table_metadata = DataLake::TableMetadata().withSchema().withLocation().withDataLakeSpecificProperties(); + /// This is added to test that lightweight queries like 'SHOW TABLES' dont end up fetching the table + fiu_do_on(FailPoints::lightweight_show_tables, + { + std::this_thread::sleep_for(std::chrono::seconds(10)); + }); + const bool with_vended_credentials = settings[DatabaseDataLakeSetting::vended_credentials].value; if (!lightweight && with_vended_credentials) table_metadata = table_metadata.withStorageCredentials(); @@ -665,15 +677,33 @@ DatabaseTablesIteratorPtr DatabaseDataLake::getTablesIterator( pool.scheduleOrThrow( [this, table_name, skip_not_loaded, context_, promise=promises.back()]() mutable { + StoragePtr storage = nullptr; try { - auto storage = tryGetTableImpl(table_name, context_, false, skip_not_loaded); - promise->set_value(storage); + LOG_INFO(log, "Get table information for table {}", table_name); + storage = tryGetTableImpl(table_name, context_, false, skip_not_loaded); } catch (...) { - promise->set_exception(std::current_exception()); + if (context_->getSettingsRef()[Setting::database_datalake_require_metadata_access]) + { + auto error_code = getCurrentExceptionCode(); + auto error_message = getCurrentExceptionMessage(true, false, true, true); + auto enhanced_message = fmt::format( + "Received error {} while fetching table metadata for existing table '{}'. " + "If you want this error to be ignored, use database_datalake_require_metadata_access=0. Error: {}", + error_code, + table_name, + error_message); + promise->set_exception(std::make_exception_ptr(Exception::createRuntime( + error_code, + enhanced_message))); + return; + } + else + tryLogCurrentException(log, fmt::format("Ignoring table {}", table_name)); } + promise->set_value(storage); }); } catch (...) @@ -701,15 +731,14 @@ DatabaseTablesIteratorPtr DatabaseDataLake::getTablesIterator( return std::make_unique(tables, getDatabaseName()); } -DatabaseTablesIteratorPtr DatabaseDataLake::getLightweightTablesIterator( - ContextPtr context_, +std::vector DatabaseDataLake::getLightweightTablesIterator( + ContextPtr /*context_*/, const FilterByNameFunction & filter_by_table_name, - bool skip_not_loaded) const + bool /*skip_not_loaded*/) const { - Tables tables; - auto catalog = getCatalog(); DB::Names iceberg_tables; + std::vector result; /// Do not throw here, because this might be, for example, a query to system.tables. /// It must not fail on case of some datalake error. @@ -722,84 +751,14 @@ DatabaseTablesIteratorPtr DatabaseDataLake::getLightweightTablesIterator( tryLogCurrentException(__PRETTY_FUNCTION__); } - auto & pool = Context::getGlobalContextInstance()->getIcebergCatalogThreadpool(); - - std::vector>> promises; - std::vector> futures; - - for (const auto & table_name : iceberg_tables) - { - if (filter_by_table_name && !filter_by_table_name(table_name)) - continue; - - /// NOTE: There are one million of different ways how we can receive - /// weird response from different catalogs. tryGetTableImpl will not - /// throw only in case of expected errors, but sometimes we can receive - /// completely unexpected results for some objects which can be stored - /// in catalogs. But this function is used in SHOW TABLES query which - /// should return at least properly described tables. That is why we - /// have this try/catch here. - try - { - promises.emplace_back(std::make_shared>()); - futures.emplace_back(promises.back()->get_future()); - - pool.scheduleOrThrow( - [this, table_name, skip_not_loaded, context_, promise = promises.back()] mutable - { - StoragePtr storage = nullptr; - try - { - storage = tryGetTableImpl(table_name, context_, true, skip_not_loaded); - } - catch (...) - { - if (context_->getSettingsRef()[Setting::database_datalake_require_metadata_access]) - { - auto error_code = getCurrentExceptionCode(); - auto error_message = getCurrentExceptionMessage(true, false, true, true); - auto enhanced_message = fmt::format( - "Received error {} while fetching table metadata for existing table '{}'. " - "If you want this error to be ignored, use database_datalake_require_metadata_access=0. Error: {}", - error_code, - table_name, - error_message); - promise->set_exception(std::make_exception_ptr(Exception::createRuntime( - error_code, - enhanced_message))); - return; - } - else - tryLogCurrentException(log, fmt::format("Ignoring table {}", table_name)); - } - promise->set_value(storage); - }); - } - catch (...) - { - promises.back()->set_value(nullptr); - tryLogCurrentException(log, "Failed to schedule task into pool"); - } - } - - for (const auto & future : futures) - future.wait(); - - size_t future_index = 0; for (const auto & table_name : iceberg_tables) { if (filter_by_table_name && !filter_by_table_name(table_name)) continue; - - if (auto storage_ptr = futures[future_index].get(); storage_ptr != nullptr) - { - [[maybe_unused]] bool inserted = tables.emplace(table_name, storage_ptr).second; - chassert(inserted); - } - future_index++; + result.emplace_back(table_name); } - return std::make_unique(tables, getDatabaseName()); + return result; } ASTPtr DatabaseDataLake::getCreateDatabaseQueryImpl() const diff --git a/src/Databases/DataLake/DatabaseDataLake.h b/src/Databases/DataLake/DatabaseDataLake.h index 736b951b0b21..efaf9f9db316 100644 --- a/src/Databases/DataLake/DatabaseDataLake.h +++ b/src/Databases/DataLake/DatabaseDataLake.h @@ -42,7 +42,7 @@ class DatabaseDataLake final : public IDatabase, WithContext bool skip_not_loaded) const override; /// skip_not_loaded flag ignores all non-iceberg tables - DatabaseTablesIteratorPtr getLightweightTablesIterator( + std::vector getLightweightTablesIterator( ContextPtr context, const FilterByNameFunction & filter_by_table_name, bool skip_not_loaded) const override; diff --git a/src/Databases/IDatabase.h b/src/Databases/IDatabase.h index 438ef2cf8998..805a06843e87 100644 --- a/src/Databases/IDatabase.h +++ b/src/Databases/IDatabase.h @@ -38,6 +38,13 @@ struct ParsedTablesMetadata; struct QualifiedTableName; class IRestoreCoordination; +/// This structure is returned when getLightweightTablesIterator is called +/// It contains basic details of the table, currently only the table name +struct LightWeightTableDetails +{ + String name; +}; + class IDatabaseTablesIterator { public: @@ -271,9 +278,17 @@ class IDatabase : public std::enable_shared_from_this /// Same as above, but may return non-fully initialized StoragePtr objects which are not suitable for reading. /// Useful for queries like "SHOW TABLES" - virtual DatabaseTablesIteratorPtr getLightweightTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name = {}, bool skip_not_loaded = false) const /// NOLINT + virtual std::vector getLightweightTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name = {}, bool skip_not_loaded = false) const /// NOLINT { - return getTablesIterator(context, filter_by_table_name, skip_not_loaded); + std::vector result; + + for (auto iterator = getTablesIterator(context, filter_by_table_name, skip_not_loaded); iterator->isValid(); iterator->next()) + { + if (const auto & table = iterator->table()) + result.emplace_back(iterator->name()); + } + + return result; } virtual DatabaseDetachedTablesSnapshotIteratorPtr getDetachedTablesIterator( diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp index 8993c8a025bf..201b2bea43f5 100644 --- a/src/Interpreters/InterpreterSystemQuery.cpp +++ b/src/Interpreters/InterpreterSystemQuery.cpp @@ -1351,7 +1351,7 @@ bool InterpreterSystemQuery::dropStorageReplica(const String & query_replica, co void InterpreterSystemQuery::dropStorageReplicasFromDatabase(const String & query_replica, DatabasePtr database) { - for (auto iterator = database->getLightweightTablesIterator(getContext()); iterator->isValid(); iterator->next()) + for (auto iterator = database->getTablesIterator(getContext()); iterator->isValid(); iterator->next()) dropStorageReplica(query_replica, iterator->table()); LOG_TRACE(log, "Dropped storage replica from {} of database {}", query_replica, backQuoteIfNeed(database->getDatabaseName())); diff --git a/src/Storages/System/StorageSystemColumns.cpp b/src/Storages/System/StorageSystemColumns.cpp index 14b15ee50a4b..8392e16a8a8e 100644 --- a/src/Storages/System/StorageSystemColumns.cpp +++ b/src/Storages/System/StorageSystemColumns.cpp @@ -490,7 +490,7 @@ void ReadFromSystemColumns::initializePipeline(QueryPipelineBuilder & pipeline, else { const DatabasePtr & database = databases.at(database_name); - for (auto iterator = database->getLightweightTablesIterator(context); iterator->isValid(); iterator->next()) + for (auto iterator = database->getTablesIterator(context); iterator->isValid(); iterator->next()) { if (const auto & table = iterator->table()) { diff --git a/src/Storages/System/StorageSystemCompletions.cpp b/src/Storages/System/StorageSystemCompletions.cpp index 458b4c1c0dab..b49851c4f535 100644 --- a/src/Storages/System/StorageSystemCompletions.cpp +++ b/src/Storages/System/StorageSystemCompletions.cpp @@ -122,7 +122,7 @@ void fillDataWithDatabasesTablesColumns(MutableColumns & res_columns, const Cont res_columns[1]->insert(DATABASE_CONTEXT); res_columns[2]->insertDefault(); - for (auto iterator = database_ptr->getLightweightTablesIterator(context); iterator->isValid(); iterator->next()) + for (auto iterator = database_ptr->getTablesIterator(context); iterator->isValid(); iterator->next()) { const auto & table_name = iterator->name(); const auto & table = iterator->table(); diff --git a/src/Storages/System/StorageSystemIcebergHistory.cpp b/src/Storages/System/StorageSystemIcebergHistory.cpp index 87b85120e668..f7ca27e65955 100644 --- a/src/Storages/System/StorageSystemIcebergHistory.cpp +++ b/src/Storages/System/StorageSystemIcebergHistory.cpp @@ -97,7 +97,7 @@ void StorageSystemIcebergHistory::fillData([[maybe_unused]] MutableColumns & res for (const auto & db: databases) { /// with last flag we are filtering out all non iceberg table - for (auto iterator = db.second->getLightweightTablesIterator(context_copy, {}, true); iterator->isValid(); iterator->next()) + for (auto iterator = db.second->getTablesIterator(context_copy, {}, true); iterator->isValid(); iterator->next()) { StoragePtr storage = iterator->table(); diff --git a/src/Storages/System/StorageSystemTables.cpp b/src/Storages/System/StorageSystemTables.cpp index 8bbb70ea2aa2..b96683bd69de 100644 --- a/src/Storages/System/StorageSystemTables.cpp +++ b/src/Storages/System/StorageSystemTables.cpp @@ -122,16 +122,29 @@ ColumnPtr getFilteredTables( } else { - auto table_it = database->getLightweightTablesIterator(context, - /* filter_by_table_name */ {}, - /* skip_not_loaded */ false); - for (; table_it->isValid(); table_it->next()) + if (engine_column || uuid_column) { - table_column->insert(table_it->name()); - if (engine_column) - engine_column->insert(table_it->table()->getName()); - if (uuid_column) - uuid_column->insert(table_it->table()->getStorageID().uuid); + auto table_it = database->getTablesIterator(context, + /* filter_by_table_name */ {}, + /* skip_not_loaded */ false); + for (; table_it->isValid(); table_it->next()) + { + table_column->insert(table_it->name()); + if (engine_column) + engine_column->insert(table_it->table()->getName()); + if (uuid_column) + uuid_column->insert(table_it->table()->getStorageID().uuid); + } + } + else + { + auto table_details = database->getLightweightTablesIterator(context, + /* filter_by_table_name */ {}, + /* skip_not_loaded */ false); + for (const auto & table_detail : table_details) + { + table_column->insert(table_detail.name); + } } } } @@ -291,6 +304,39 @@ class TablesBlockSource : public ISource } } + + size_t fillTableNamesOnly(MutableColumns & res_columns) + { + auto table_details = database->getLightweightTablesIterator(context, + /* filter_by_table_name */ {}, + /* skip_not_loaded */ false); + + size_t count = 0; + + const auto access = context->getAccess(); + for (const auto & table_detail: table_details) + { + if (!tables.contains(table_detail.name)) + continue; + + size_t src_index = 0; + size_t res_index = 0; + + if (!access->isGranted(AccessType::SHOW_TABLES, database_name, table_detail.name)) + continue; + + if (columns_mask[src_index++]) + res_columns[res_index++]->insert(database_name); + + if (columns_mask[src_index++]) + res_columns[res_index++]->insert(table_detail.name); + + ++count; + } + ++database_idx; + return count; + } + Chunk generate() override { if (done) @@ -449,8 +495,23 @@ class TablesBlockSource : public ISource const bool need_to_check_access_for_tables = need_to_check_access_for_databases && !access->isGranted(AccessType::SHOW_TABLES, database_name); + /// This is for queries similar to 'show tables', where only name of the table is needed + auto needed_columns = getPort().getHeader().getColumnsWithTypeAndName(); + bool needs_one_column = (needed_columns.size() == 1 && needed_columns[0].name == "name"); + + bool needs_two_columns = (needed_columns.size() == 2 && + ((needed_columns[0].name == "name" && needed_columns[1].name == "database") || + (needed_columns[0].name == "database" && needed_columns[1].name == "name"))); + + if ((needs_one_column || needs_two_columns) && !need_to_check_access_for_tables) + { + size_t rows_added = fillTableNamesOnly(res_columns); + rows_count += rows_added; + continue; + } + if (!tables_it || !tables_it->isValid()) - tables_it = database->getLightweightTablesIterator(context, + tables_it = database->getTablesIterator(context, /* filter_by_table_name */ {}, /* skip_not_loaded */ false); @@ -923,7 +984,6 @@ class TablesBlockSource : public ISource } } } - UInt64 num_rows = res_columns.at(0)->size(); return Chunk(std::move(res_columns), num_rows); } diff --git a/tests/integration/test_database_glue/test.py b/tests/integration/test_database_glue/test.py index 3a0e261a2c09..13e18258e5bb 100644 --- a/tests/integration/test_database_glue/test.py +++ b/tests/integration/test_database_glue/test.py @@ -644,6 +644,7 @@ def test_system_tables(started_cluster): assert CATALOG_NAME in node.query("SHOW DATABASES") assert table_name in node.query(f"SHOW TABLES FROM {CATALOG_NAME}") + # system.tables assert int(node.query(f"SELECT count() FROM system.tables WHERE database = '{CATALOG_NAME}' and table ilike '%{root_namespace}%' SETTINGS show_data_lake_catalogs_in_system_tables = true").strip()) == 4 assert int(node.query(f"SELECT count() FROM system.tables WHERE database = '{CATALOG_NAME}' and table ilike '%{root_namespace}%'").strip()) == 0 @@ -664,6 +665,51 @@ def test_system_tables(started_cluster): assert int(node.query(f"SELECT count() FROM system.completions WHERE startsWith(word, '{test_ref}') SETTINGS show_data_lake_catalogs_in_system_tables = true").strip()) != 0 assert int(node.query(f"SELECT count() FROM system.completions WHERE startsWith(word, '{test_ref}')").strip()) == 0 +def test_show_tables_optimization(started_cluster): + node = started_cluster.instances["node1"] + + test_ref = f"test_show_tables_{uuid.uuid4()}" + table_name = f"{test_ref}_table" + root_namespace = f"{test_ref}_namespace" + + namespaces_to_create = [ + root_namespace, + f"{root_namespace}_A", + f"{root_namespace}_B", + f"{root_namespace}_C", + ] + + catalog = load_catalog_impl(started_cluster) + + for namespace in namespaces_to_create: + catalog.create_namespace(namespace) + assert len(catalog.list_tables(namespace)) == 0 + + for namespace in namespaces_to_create: + table = create_table(catalog, namespace, table_name) + + num_rows = 10 + df = generate_arrow_data(num_rows) + table.append(df) + + create_clickhouse_glue_database(started_cluster, node, CATALOG_NAME) + + assert table_name in node.query(f"SHOW TABLES FROM {CATALOG_NAME}") + + assert not node.contains_in_log( + f"Get table information for table {root_namespace}.{table_name}" + ) + + node.query(f"SELECT * from system.tables where table ilike '%{root_namespace}%' SETTINGS show_data_lake_catalogs_in_system_tables = true") + assert node.contains_in_log( + f"Get table information for table {root_namespace}.{table_name}" + ) + + node.query(f"SYSTEM ENABLE FAILPOINT lightweight_show_tables") + try: + node.query(f"SHOW TABLES FROM {CATALOG_NAME}", timeout=5) + finally: + node.query(f"SYSTEM DISABLE FAILPOINT lightweight_show_tables") def test_table_without_metadata_location(started_cluster): """