From afe95284b447861756ab41f1586db819b49ae9a0 Mon Sep 17 00:00:00 2001 From: osipovartem Date: Tue, 6 May 2025 12:34:07 +0300 Subject: [PATCH 1/5] Add external catalogs to catalogs tree, fixed data preview for external catalogs --- crates/runtime/Cargo.toml | 1 + .../runtime/src/execution/catalog/catalog.rs | 40 +++++--- .../src/execution/catalog/catalog_list.rs | 10 +- .../runtime/src/execution/catalog/schema.rs | 25 +++-- crates/runtime/src/execution/catalog/table.rs | 25 ++++- crates/runtime/src/execution/query.rs | 29 ------ .../src/http/ui/navigation_trees/error.rs | 6 +- .../src/http/ui/navigation_trees/handlers.rs | 92 ++++++++----------- crates/runtime/src/http/ui/tables/error.rs | 6 +- crates/runtime/src/http/ui/tables/handlers.rs | 70 +++++--------- 10 files changed, 142 insertions(+), 162 deletions(-) diff --git a/crates/runtime/Cargo.toml b/crates/runtime/Cargo.toml index 08bd9e56f..693c0512b 100644 --- a/crates/runtime/Cargo.toml +++ b/crates/runtime/Cargo.toml @@ -105,6 +105,7 @@ utoipa-axum = { workspace = true } utoipa-swagger-ui = { workspace = true } uuid = { workspace = true } validator = { version = "0.20.0", features = ["derive"] } +once_cell = { version = "1.20.2" } [lints] workspace = true diff --git a/crates/runtime/src/execution/catalog/catalog.rs b/crates/runtime/src/execution/catalog/catalog.rs index 9a04792bd..ae83ed200 100644 --- a/crates/runtime/src/execution/catalog/catalog.rs +++ b/crates/runtime/src/execution/catalog/catalog.rs @@ -30,22 +30,38 @@ impl CatalogProvider for CachingCatalog { fn schema_names(&self) -> Vec { if self.schemas_cache.is_empty() { - // Fallback to the original catalog schema names if the cache is empty - self.catalog.schema_names() - } else { - self.schemas_cache - .iter() - .map(|entry| entry.key().clone()) - .collect() + for name in self.catalog.schema_names() { + if let Some(schema) = self.catalog.schema(&name) { + self.schemas_cache.insert( + name.clone(), + Arc::new(CachingSchema { + name, + schema, + tables_cache: DashMap::new(), + }), + ); + } + } } + self.schemas_cache.iter().map(|e| e.key().clone()).collect() } #[allow(clippy::as_conversions)] fn schema(&self, name: &str) -> Option> { - self.schemas_cache - .get(name) - .map(|schema| Arc::clone(schema.value()) as Arc) - // Fallback to the original catalog schema if the cache is empty - .or_else(|| self.catalog.schema(name)) + if let Some(schema) = self.schemas_cache.get(name) { + Some(Arc::clone(schema.value()) as Arc) + } else if let Some(schema) = self.catalog.schema(name) { + let caching_schema = Arc::new(CachingSchema { + name: name.to_string(), + schema: Arc::clone(&schema), + tables_cache: DashMap::new(), + }); + + self.schemas_cache + .insert(name.to_string(), Arc::clone(&caching_schema)); + Some(caching_schema as Arc) + } else { + None + } } } diff --git a/crates/runtime/src/execution/catalog/catalog_list.rs b/crates/runtime/src/execution/catalog/catalog_list.rs index 545149557..2b24a7999 100644 --- a/crates/runtime/src/execution/catalog/catalog_list.rs +++ b/crates/runtime/src/execution/catalog/catalog_list.rs @@ -183,11 +183,11 @@ impl EmbucketCatalogList { { schema.tables_cache.insert( table.clone(), - Arc::new(CachingTable { - name: table.to_string(), - schema: Some(table_provider.schema()), - table: Arc::clone(&table_provider), - }), + Arc::new(CachingTable::new_with_schema( + table, + table_provider.schema(), + Arc::clone(&table_provider), + )), ); } } diff --git a/crates/runtime/src/execution/catalog/schema.rs b/crates/runtime/src/execution/catalog/schema.rs index 23091b0e5..71d3fc693 100644 --- a/crates/runtime/src/execution/catalog/schema.rs +++ b/crates/runtime/src/execution/catalog/schema.rs @@ -18,7 +18,7 @@ impl std::fmt::Debug for CachingSchema { f.debug_struct("Schema") .field("schema", &"") .field("name", &self.name) - .field("tables_cache", &"") + .field("tables_cache", &self.tables_cache) .finish() } } @@ -33,6 +33,7 @@ impl SchemaProvider for CachingSchema { if self.tables_cache.is_empty() { self.schema.table_names() } else { + // Don't fill the cache since should call async table() to fill it self.tables_cache .iter() .map(|entry| entry.key().clone()) @@ -47,11 +48,8 @@ impl SchemaProvider for CachingSchema { } else { // Fallback to the original schema table if the cache is empty if let Some(table) = self.schema.table(name).await? { - let caching_table = Arc::new(CachingTable { - name: name.to_string(), - schema: Some(table.schema()), - table: Arc::clone(&table), - }); + let caching_table = + Arc::new(CachingTable::new(name.to_string(), Arc::clone(&table))); // Insert into cache self.tables_cache @@ -64,6 +62,21 @@ impl SchemaProvider for CachingSchema { } } + fn register_table( + &self, + name: String, + table: Arc, + ) -> datafusion_common::Result>> { + self.schema.register_table(name, table) + } + + fn deregister_table( + &self, + name: &str, + ) -> datafusion_common::Result>> { + self.schema.deregister_table(name) + } + fn table_exist(&self, name: &str) -> bool { self.tables_cache.contains_key(name) } diff --git a/crates/runtime/src/execution/catalog/table.rs b/crates/runtime/src/execution/catalog/table.rs index d5168b8f5..f42213d6f 100644 --- a/crates/runtime/src/execution/catalog/table.rs +++ b/crates/runtime/src/execution/catalog/table.rs @@ -4,15 +4,33 @@ use datafusion::catalog::{Session, TableProvider}; use datafusion_expr::dml::InsertOp; use datafusion_expr::{Expr, TableType}; use datafusion_physical_plan::ExecutionPlan; +use once_cell::sync::OnceCell; use std::any::Any; use std::sync::Arc; pub struct CachingTable { - pub schema: Option, + pub schema: OnceCell, pub name: String, pub table: Arc, } +impl CachingTable { + pub fn new(name: String, table: Arc) -> Self { + Self { + schema: OnceCell::new(), + name, + table, + } + } + pub fn new_with_schema(name: String, schema: SchemaRef, table: Arc) -> Self { + Self { + schema: OnceCell::from(schema), + name, + table, + } + } +} + impl std::fmt::Debug for CachingTable { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Table") @@ -30,10 +48,7 @@ impl TableProvider for CachingTable { } fn schema(&self) -> SchemaRef { - match &self.schema { - Some(schema) => Arc::clone(schema), - None => self.table.schema(), - } + self.schema.get_or_init(|| self.table.schema()).clone() } fn table_type(&self) -> TableType { diff --git a/crates/runtime/src/execution/query.rs b/crates/runtime/src/execution/query.rs index dfdd20abd..359761861 100644 --- a/crates/runtime/src/execution/query.rs +++ b/crates/runtime/src/execution/query.rs @@ -1141,35 +1141,6 @@ impl UserQuery { } } } - // Unwraps are allowed here because we are sure that objects exists - for catalog in self.session.ctx.state().catalog_list().catalog_names() { - let provider = self - .session - .ctx - .state() - .catalog_list() - .catalog(&catalog) - .unwrap(); - for schema in provider.schema_names() { - for table in provider.schema(&schema).unwrap().table_names() { - let table_source = provider - .schema(&schema) - .unwrap() - .table(&table) - .await - .context(super::error::DataFusionSnafu)? - .ok_or(ExecutionError::TableProviderNotFound { - table_name: table.clone(), - })?; - let resolved = state.resolve_table_ref(TableReference::full( - catalog.to_string(), - schema.to_string(), - table, - )); - tables.insert(resolved, provider_as_source(table_source)); - } - } - } Ok(tables) } // TODO: We need to recursively fix any missing table references with the default diff --git a/crates/runtime/src/http/ui/navigation_trees/error.rs b/crates/runtime/src/http/ui/navigation_trees/error.rs index 41c853977..b59374d06 100644 --- a/crates/runtime/src/http/ui/navigation_trees/error.rs +++ b/crates/runtime/src/http/ui/navigation_trees/error.rs @@ -1,3 +1,4 @@ +use crate::execution::error::ExecutionError; use crate::http::error::ErrorResponse; use crate::http::ui::error::IntoStatusCode; use axum::response::IntoResponse; @@ -13,13 +14,16 @@ pub type NavigationTreesResult = Result; pub enum NavigationTreesAPIError { #[snafu(display("Get navigation trees error: {source}"))] Get { source: MetastoreError }, + + #[snafu(display("Failed to create session error: {source}"))] + SessionError { source: ExecutionError }, } // Select which status code to return. impl IntoStatusCode for NavigationTreesAPIError { fn status_code(&self) -> StatusCode { match self { - Self::Get { .. } => StatusCode::INTERNAL_SERVER_ERROR, + Self::Get { .. } | Self::SessionError { .. } => StatusCode::INTERNAL_SERVER_ERROR, } } } diff --git a/crates/runtime/src/http/ui/navigation_trees/handlers.rs b/crates/runtime/src/http/ui/navigation_trees/handlers.rs index c0390bd95..7079052b7 100644 --- a/crates/runtime/src/http/ui/navigation_trees/handlers.rs +++ b/crates/runtime/src/http/ui/navigation_trees/handlers.rs @@ -1,14 +1,14 @@ use crate::http::error::ErrorResponse; +use crate::http::session::DFSessionId; use crate::http::state::AppState; -use crate::http::ui::navigation_trees::error::{NavigationTreesAPIError, NavigationTreesResult}; +use crate::http::ui::navigation_trees::error::{self as error, NavigationTreesResult}; use crate::http::ui::navigation_trees::models::{ NavigationTreeDatabase, NavigationTreeSchema, NavigationTreeTable, NavigationTreesParameters, NavigationTreesResponse, }; use axum::extract::Query; use axum::{extract::State, Json}; -use embucket_metastore::error::MetastoreError; -use embucket_utils::scan_iterator::ScanIterator; +use snafu::ResultExt; use utoipa::OpenApi; #[derive(OpenApi)] @@ -47,67 +47,51 @@ pub struct ApiDoc; )] #[tracing::instrument(level = "debug", skip(state), err, ret(level = tracing::Level::TRACE))] pub async fn get_navigation_trees( + DFSessionId(session_id): DFSessionId, Query(parameters): Query, State(state): State, ) -> NavigationTreesResult> { - let rw_databases = state - .metastore - .iter_databases() - .cursor(parameters.cursor.clone()) - .limit(parameters.limit) - .collect() + let catalogs_tree = state + .execution_svc + .create_session(session_id) .await - .map_err(|e| NavigationTreesAPIError::Get { - source: MetastoreError::UtilSlateDB { source: e }, - })?; + .context(error::SessionSnafu)? + .fetch_catalogs_tree(); - let next_cursor = rw_databases - .iter() - .last() - .map_or(String::new(), |rw_object| rw_object.ident.clone()); + let offset = parameters + .cursor + .as_deref() + .and_then(|s| s.parse::().ok()) + .unwrap_or(0); + let limit = parameters.limit.map_or(usize::MAX, usize::from); - let mut databases: Vec = vec![]; - for rw_database in rw_databases { - let rw_schemas = state - .metastore - .iter_schemas(&rw_database.ident.clone()) - .collect() - .await - .map_err(|e| NavigationTreesAPIError::Get { - source: MetastoreError::UtilSlateDB { source: e }, - })?; + let items: Vec<_> = catalogs_tree + .into_iter() + .skip(offset) + .take(limit) + .map(|(catalog_name, schemas_map)| { + let schemas = schemas_map + .into_iter() + .map(|(schema_name, table_names)| NavigationTreeSchema { + name: schema_name, + tables: table_names + .into_iter() + .map(|name| NavigationTreeTable { name }) + .collect(), + }) + .collect(); - let mut schemas: Vec = vec![]; - for rw_schema in rw_schemas { - let rw_tables = state - .metastore - .iter_tables(&rw_schema.ident) - .collect() - .await - .map_err(|e| NavigationTreesAPIError::Get { - source: MetastoreError::UtilSlateDB { source: e }, - })?; - - let mut tables: Vec = vec![]; - for rw_table in rw_tables { - tables.push(NavigationTreeTable { - name: rw_table.ident.table.clone(), - }); + NavigationTreeDatabase { + name: catalog_name, + schemas, } - schemas.push(NavigationTreeSchema { - name: rw_schema.ident.schema.clone(), - tables, - }); - } - databases.push(NavigationTreeDatabase { - name: rw_database.ident.clone(), - schemas, - }); - } + }) + .collect(); + let next_cursor = (items.len() == limit).then(|| (offset + limit).to_string()); Ok(Json(NavigationTreesResponse { - items: databases, + items, current_cursor: parameters.cursor, - next_cursor, + next_cursor: next_cursor.unwrap_or_default(), })) } diff --git a/crates/runtime/src/http/ui/tables/error.rs b/crates/runtime/src/http/ui/tables/error.rs index 4790e7ac6..8c5438400 100644 --- a/crates/runtime/src/http/ui/tables/error.rs +++ b/crates/runtime/src/http/ui/tables/error.rs @@ -34,8 +34,8 @@ pub enum TableError { pub enum TablesAPIError { #[snafu(display("Create table error: {source}"))] CreateUpload { source: TableError }, - #[snafu(display("Get table error: {source}"))] - GetExecution { source: ExecutionError }, + #[snafu(display("Execution error: {source}"))] + Execution { source: ExecutionError }, #[snafu(display("Get table error: {source}"))] GetMetastore { source: MetastoreError }, } @@ -60,7 +60,7 @@ impl IntoStatusCode for TablesAPIError { } => StatusCode::UNPROCESSABLE_ENTITY, _ => StatusCode::INTERNAL_SERVER_ERROR, }, - Self::GetExecution { source } => match &source { + Self::Execution { source } => match &source { ExecutionError::TableNotFound { .. } => StatusCode::NOT_FOUND, _ => StatusCode::INTERNAL_SERVER_ERROR, }, diff --git a/crates/runtime/src/http/ui/tables/handlers.rs b/crates/runtime/src/http/ui/tables/handlers.rs index f3d438e89..4c749e8ba 100644 --- a/crates/runtime/src/http/ui/tables/handlers.rs +++ b/crates/runtime/src/http/ui/tables/handlers.rs @@ -11,7 +11,7 @@ use crate::http::ui::tables::models::{ TablePreviewDataResponse, TablePreviewDataRow, TableStatistics, TableStatisticsResponse, TableUploadPayload, TableUploadResponse, TablesParameters, TablesResponse, UploadParameters, }; -use arrow_array::{Array, StringArray}; +use arrow::util::display::array_value_to_string; use axum::extract::Query; use axum::{ extract::{Multipart, Path, State}, @@ -140,15 +140,15 @@ pub async fn get_table_statistics( pub async fn get_table_columns( DFSessionId(session_id): DFSessionId, State(state): State, - Path((database_name, schema_name, table_name)): Path<(String, String, String)>, + Path((database, schema, table)): Path<(String, String, String)>, ) -> TablesResult> { - let context = QueryContext::new(Some(database_name.clone()), Some(schema_name.clone()), None); - let sql_string = format!("SELECT * FROM {database_name}.{schema_name}.{table_name} LIMIT 0"); + let context = QueryContext::new(Some(database.clone()), Some(schema.clone()), None); + let sql_string = format!("SELECT * FROM {database}.{schema}.{table} LIMIT 0"); let (_, column_infos) = state .execution_svc .query(&session_id, sql_string.as_str(), context) .await - .map_err(|e| TablesAPIError::GetExecution { source: e })?; + .map_err(|e| TablesAPIError::Execution { source: e })?; let items: Vec = column_infos .iter() .map(|column_info| TableColumn { @@ -193,39 +193,10 @@ pub async fn get_table_preview_data( DFSessionId(session_id): DFSessionId, Query(parameters): Query, State(state): State, - Path((database_name, schema_name, table_name)): Path<(String, String, String)>, + Path((database, schema, table)): Path<(String, String, String)>, ) -> TablesResult> { - let context = QueryContext::new(Some(database_name.clone()), Some(schema_name.clone()), None); - let ident = MetastoreTableIdent::new(&database_name, &schema_name, &table_name); - let column_names = match state.metastore.get_table(&ident).await { - Ok(Some(rw_object)) => { - if let Ok(schema) = rw_object.metadata.current_schema(None) { - let items: Vec = schema.iter().map(|field| field.name.clone()).collect(); - Ok(items) - } else { - Ok(vec![]) - } - } - Ok(None) => Err(TablesAPIError::GetMetastore { - source: MetastoreError::TableNotFound { - table: database_name.clone(), - schema: schema_name.clone(), - db: table_name.clone(), - }, - }), - Err(e) => Err(TablesAPIError::GetMetastore { source: e }), - }?; - let column_names = column_names - .iter() - //UNSUPPORTED TYPES: ListArray, StructArray, Binary (Arrow Cast for Datafusion) - .map(|column_name| { - format!("COALESCE(CAST({column_name} AS STRING), 'Unsupported') AS {column_name}") - }) - .collect::>(); - let sql_string = format!( - "SELECT {} FROM {database_name}.{schema_name}.{table_name}", - column_names.join(", ") - ); + let context = QueryContext::new(Some(database.clone()), Some(schema.clone()), None); + let sql_string = format!("SELECT * FROM {database}.{schema}.{table}"); let sql_string = parameters.offset.map_or(sql_string.clone(), |offset| { format!("{sql_string} OFFSET {offset}") }); @@ -236,21 +207,26 @@ pub async fn get_table_preview_data( .execution_svc .query(&session_id, sql_string.as_str(), context) .await - .map_err(|e| TablesAPIError::GetExecution { source: e })?; - let mut preview_data_columns: Vec = vec![]; + .map_err(|e| TablesAPIError::Execution { source: e })?; + + let mut preview_data_columns = Vec::new(); for batch in &batches { + let schema = batch.schema(); + let num_rows = batch.num_rows(); + for (i, column) in batch.columns().iter().enumerate() { - let preview_data_rows: Vec = column - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .map(|row| TablePreviewDataRow { - data: row.unwrap().to_string(), + let array = column.as_ref(); + + let preview_data_rows: Vec = (0..num_rows) + .map(|row_index| { + let data = array_value_to_string(array, row_index) + .unwrap_or_else(|_| "ERROR".to_string()); + TablePreviewDataRow { data } }) .collect(); + preview_data_columns.push(TablePreviewDataColumn { - name: batch.schema().fields[i].name().to_string(), + name: schema.field(i).name().to_string(), rows: preview_data_rows, }); } From 6900ef3962ce4697a73be2cfeed2ba2375e7a2b7 Mon Sep 17 00:00:00 2001 From: osipovartem Date: Tue, 6 May 2025 12:53:20 +0300 Subject: [PATCH 2/5] Add register/deregister tables with cache --- crates/runtime/src/execution/catalog/schema.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/runtime/src/execution/catalog/schema.rs b/crates/runtime/src/execution/catalog/schema.rs index 71d3fc693..57d33e141 100644 --- a/crates/runtime/src/execution/catalog/schema.rs +++ b/crates/runtime/src/execution/catalog/schema.rs @@ -67,6 +67,8 @@ impl SchemaProvider for CachingSchema { name: String, table: Arc, ) -> datafusion_common::Result>> { + let caching_table = Arc::new(CachingTable::new(name.clone(), Arc::clone(&table))); + self.tables_cache.insert(name.clone(), caching_table); self.schema.register_table(name, table) } @@ -74,6 +76,7 @@ impl SchemaProvider for CachingSchema { &self, name: &str, ) -> datafusion_common::Result>> { + self.tables_cache.remove(name); self.schema.deregister_table(name) } From 503d2bf0880f20c2fb8de873d133265a82f48364 Mon Sep 17 00:00:00 2001 From: osipovartem Date: Tue, 6 May 2025 17:12:16 +0300 Subject: [PATCH 3/5] Rename cursor to offset --- .../src/http/ui/navigation_trees/handlers.rs | 16 ++++++---------- .../src/http/ui/navigation_trees/models.rs | 6 +++--- 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/crates/runtime/src/http/ui/navigation_trees/handlers.rs b/crates/runtime/src/http/ui/navigation_trees/handlers.rs index 7079052b7..42741b11d 100644 --- a/crates/runtime/src/http/ui/navigation_trees/handlers.rs +++ b/crates/runtime/src/http/ui/navigation_trees/handlers.rs @@ -35,8 +35,8 @@ pub struct ApiDoc; get, operation_id = "getNavigationTrees", params( - ("cursor" = Option, Query, description = "Navigation trees cursor"), - ("limit" = Option, Query, description = "Navigation trees limit"), + ("offset" = Option, Query, description = "Navigation trees offset"), + ("limit" = Option, Query, description = "Navigation trees limit"), ), tags = ["navigation-trees"], path = "/ui/navigation-trees", @@ -58,11 +58,7 @@ pub async fn get_navigation_trees( .context(error::SessionSnafu)? .fetch_catalogs_tree(); - let offset = parameters - .cursor - .as_deref() - .and_then(|s| s.parse::().ok()) - .unwrap_or(0); + let offset = parameters.offset.unwrap_or_default(); let limit = parameters.limit.map_or(usize::MAX, usize::from); let items: Vec<_> = catalogs_tree @@ -88,10 +84,10 @@ pub async fn get_navigation_trees( }) .collect(); - let next_cursor = (items.len() == limit).then(|| (offset + limit).to_string()); + let next_cursor = (items.len() == limit).then(|| offset + limit); Ok(Json(NavigationTreesResponse { items, - current_cursor: parameters.cursor, - next_cursor: next_cursor.unwrap_or_default(), + offset: parameters.offset, + next_offset: next_cursor.unwrap_or_default(), })) } diff --git a/crates/runtime/src/http/ui/navigation_trees/models.rs b/crates/runtime/src/http/ui/navigation_trees/models.rs index d354ac075..b706dbdde 100644 --- a/crates/runtime/src/http/ui/navigation_trees/models.rs +++ b/crates/runtime/src/http/ui/navigation_trees/models.rs @@ -7,8 +7,8 @@ use validator::Validate; #[serde(rename_all = "camelCase")] pub struct NavigationTreesResponse { pub items: Vec, - pub current_cursor: Option, - pub next_cursor: String, + pub offset: Option, + pub next_offset: usize, } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Validate, ToSchema)] @@ -33,7 +33,7 @@ pub struct NavigationTreeTable { #[derive(Debug, Deserialize, ToSchema, IntoParams)] pub struct NavigationTreesParameters { - pub cursor: Option, + pub offset: Option, #[serde(default = "default_limit")] pub limit: Option, } From 9cc3bc974989af97fc5d0a944f61709a9942bee7 Mon Sep 17 00:00:00 2001 From: osipovartem Date: Tue, 6 May 2025 17:27:32 +0300 Subject: [PATCH 4/5] Remove cursors from resp --- .../src/http/ui/navigation_trees/handlers.rs | 27 +++++++------------ .../src/http/ui/navigation_trees/models.rs | 2 -- 2 files changed, 9 insertions(+), 20 deletions(-) diff --git a/crates/runtime/src/http/ui/navigation_trees/handlers.rs b/crates/runtime/src/http/ui/navigation_trees/handlers.rs index 42741b11d..bf2b8a65a 100644 --- a/crates/runtime/src/http/ui/navigation_trees/handlers.rs +++ b/crates/runtime/src/http/ui/navigation_trees/handlers.rs @@ -48,7 +48,7 @@ pub struct ApiDoc; #[tracing::instrument(level = "debug", skip(state), err, ret(level = tracing::Level::TRACE))] pub async fn get_navigation_trees( DFSessionId(session_id): DFSessionId, - Query(parameters): Query, + Query(params): Query, State(state): State, ) -> NavigationTreesResult> { let catalogs_tree = state @@ -58,15 +58,16 @@ pub async fn get_navigation_trees( .context(error::SessionSnafu)? .fetch_catalogs_tree(); - let offset = parameters.offset.unwrap_or_default(); - let limit = parameters.limit.map_or(usize::MAX, usize::from); + let offset = params.offset.unwrap_or(0); + let limit = params.limit.map_or(usize::MAX, usize::from); - let items: Vec<_> = catalogs_tree + let items = catalogs_tree .into_iter() .skip(offset) .take(limit) - .map(|(catalog_name, schemas_map)| { - let schemas = schemas_map + .map(|(catalog_name, schemas_map)| NavigationTreeDatabase { + name: catalog_name, + schemas: schemas_map .into_iter() .map(|(schema_name, table_names)| NavigationTreeSchema { name: schema_name, @@ -75,19 +76,9 @@ pub async fn get_navigation_trees( .map(|name| NavigationTreeTable { name }) .collect(), }) - .collect(); - - NavigationTreeDatabase { - name: catalog_name, - schemas, - } + .collect(), }) .collect(); - let next_cursor = (items.len() == limit).then(|| offset + limit); - Ok(Json(NavigationTreesResponse { - items, - offset: parameters.offset, - next_offset: next_cursor.unwrap_or_default(), - })) + Ok(Json(NavigationTreesResponse { items })) } diff --git a/crates/runtime/src/http/ui/navigation_trees/models.rs b/crates/runtime/src/http/ui/navigation_trees/models.rs index b706dbdde..4ff528f9d 100644 --- a/crates/runtime/src/http/ui/navigation_trees/models.rs +++ b/crates/runtime/src/http/ui/navigation_trees/models.rs @@ -7,8 +7,6 @@ use validator::Validate; #[serde(rename_all = "camelCase")] pub struct NavigationTreesResponse { pub items: Vec, - pub offset: Option, - pub next_offset: usize, } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Validate, ToSchema)] From 2ec9750ed7f1c66fd7379d71408b07299313832b Mon Sep 17 00:00:00 2001 From: osipovartem Date: Tue, 6 May 2025 19:01:04 +0300 Subject: [PATCH 5/5] Replace catalog method by sql query --- crates/runtime/src/execution/session.rs | 39 +------------ .../src/http/ui/navigation_trees/error.rs | 6 +- .../src/http/ui/navigation_trees/handlers.rs | 55 ++++++++++++++++--- .../src/http/ui/tests/navigation_trees.rs | 20 +++++-- 4 files changed, 68 insertions(+), 52 deletions(-) diff --git a/crates/runtime/src/execution/session.rs b/crates/runtime/src/execution/session.rs index eb2cd3e40..982e7a4ed 100644 --- a/crates/runtime/src/execution/session.rs +++ b/crates/runtime/src/execution/session.rs @@ -29,7 +29,7 @@ use iceberg_rust::object_store::ObjectStoreBuilder; use iceberg_s3tables_catalog::S3TablesCatalog; use snafu::ResultExt; use std::any::Any; -use std::collections::{BTreeMap, HashMap}; +use std::collections::HashMap; use std::env; use std::sync::Arc; @@ -40,8 +40,6 @@ pub struct UserSession { pub executor: DedicatedExecutor, } -pub type CatalogsTree = BTreeMap>>; - impl UserSession { pub async fn new(metastore: Arc) -> ExecutionResult { let sql_parser_dialect = @@ -155,41 +153,6 @@ impl UserSession { UserQuery::new(self.clone(), query.into(), query_context) } - /// Returns a tree of available catalogs, schemas, and table names. - /// - /// The structure of the result: - /// - /// ```json - /// { - /// "catalog1": { - /// "schema1": ["table1", "table2"], - /// "schema2": ["table3"] - /// }, - /// "catalog2": { ... } - /// } - /// ``` - /// - /// Note: Only table names are retrieved — the actual table metadata is not loaded. - /// - /// Returns a [`CatalogsTree`] mapping catalog names to schemas and their tables. - #[must_use] - pub fn fetch_catalogs_tree(&self) -> CatalogsTree { - let mut tree: CatalogsTree = BTreeMap::new(); - - for catalog_name in self.ctx.catalog_names() { - if let Some(catalog) = self.ctx.catalog(&catalog_name) { - let mut schemas = BTreeMap::new(); - for schema_name in catalog.schema_names() { - if let Some(schema) = catalog.schema(&schema_name) { - schemas.insert(schema_name, schema.table_names()); - } - } - tree.insert(catalog_name, schemas); - } - } - tree - } - pub fn set_session_variable( &self, set: bool, diff --git a/crates/runtime/src/http/ui/navigation_trees/error.rs b/crates/runtime/src/http/ui/navigation_trees/error.rs index b59374d06..e9ea727a2 100644 --- a/crates/runtime/src/http/ui/navigation_trees/error.rs +++ b/crates/runtime/src/http/ui/navigation_trees/error.rs @@ -15,15 +15,15 @@ pub enum NavigationTreesAPIError { #[snafu(display("Get navigation trees error: {source}"))] Get { source: MetastoreError }, - #[snafu(display("Failed to create session error: {source}"))] - SessionError { source: ExecutionError }, + #[snafu(display("Execution error: {source}"))] + Execution { source: ExecutionError }, } // Select which status code to return. impl IntoStatusCode for NavigationTreesAPIError { fn status_code(&self) -> StatusCode { match self { - Self::Get { .. } | Self::SessionError { .. } => StatusCode::INTERNAL_SERVER_ERROR, + Self::Get { .. } | Self::Execution { .. } => StatusCode::INTERNAL_SERVER_ERROR, } } } diff --git a/crates/runtime/src/http/ui/navigation_trees/handlers.rs b/crates/runtime/src/http/ui/navigation_trees/handlers.rs index bf2b8a65a..92b0afcef 100644 --- a/crates/runtime/src/http/ui/navigation_trees/handlers.rs +++ b/crates/runtime/src/http/ui/navigation_trees/handlers.rs @@ -1,14 +1,18 @@ +use crate::execution::error::ExecutionError; +use crate::execution::query::QueryContext; use crate::http::error::ErrorResponse; use crate::http::session::DFSessionId; use crate::http::state::AppState; -use crate::http::ui::navigation_trees::error::{self as error, NavigationTreesResult}; +use crate::http::ui::navigation_trees::error::{NavigationTreesAPIError, NavigationTreesResult}; use crate::http::ui::navigation_trees::models::{ NavigationTreeDatabase, NavigationTreeSchema, NavigationTreeTable, NavigationTreesParameters, NavigationTreesResponse, }; +use arrow_array::{RecordBatch, StringArray}; use axum::extract::Query; use axum::{extract::State, Json}; -use snafu::ResultExt; +use datafusion_common::DataFusionError; +use std::collections::BTreeMap; use utoipa::OpenApi; #[derive(OpenApi)] @@ -45,18 +49,41 @@ pub struct ApiDoc; (status = 500, description = "Internal server error", body = ErrorResponse) ) )] -#[tracing::instrument(level = "debug", skip(state), err, ret(level = tracing::Level::TRACE))] pub async fn get_navigation_trees( DFSessionId(session_id): DFSessionId, Query(params): Query, State(state): State, ) -> NavigationTreesResult> { - let catalogs_tree = state + let (batches, _) = state .execution_svc - .create_session(session_id) + .query( + &session_id, + "SELECT table_catalog, table_schema, table_name FROM information_schema.tables", + QueryContext::default(), + ) .await - .context(error::SessionSnafu)? - .fetch_catalogs_tree(); + .map_err(|e| NavigationTreesAPIError::Execution { source: e })?; + + let mut catalogs_tree: BTreeMap>> = BTreeMap::new(); + + for batch in batches { + let catalog_col = downcast_string_column(&batch, "table_catalog")?; + let schema_col = downcast_string_column(&batch, "table_schema")?; + let name_col = downcast_string_column(&batch, "table_name")?; + + for i in 0..batch.num_rows() { + let catalog = catalog_col.value(i).to_string(); + let schema = schema_col.value(i).to_string(); + let name = name_col.value(i).to_string(); + + catalogs_tree + .entry(catalog) + .or_default() + .entry(schema) + .or_default() + .push(name); + } + } let offset = params.offset.unwrap_or(0); let limit = params.limit.map_or(usize::MAX, usize::from); @@ -82,3 +109,17 @@ pub async fn get_navigation_trees( Ok(Json(NavigationTreesResponse { items })) } + +fn downcast_string_column<'a>( + batch: &'a RecordBatch, + name: &str, +) -> Result<&'a StringArray, NavigationTreesAPIError> { + batch + .column_by_name(name) + .and_then(|col| col.as_any().downcast_ref::()) + .ok_or_else(|| NavigationTreesAPIError::Execution { + source: ExecutionError::DataFusion { + source: DataFusionError::Internal(format!("Missing or invalid column: '{name}'")), + }, + }) +} diff --git a/crates/runtime/src/http/ui/tests/navigation_trees.rs b/crates/runtime/src/http/ui/tests/navigation_trees.rs index 9d54f8842..3561e7a70 100644 --- a/crates/runtime/src/http/ui/tests/navigation_trees.rs +++ b/crates/runtime/src/http/ui/tests/navigation_trees.rs @@ -113,7 +113,7 @@ async fn test_ui_databases_navigation() { let databases_navigation: NavigationTreesResponse = res.json().await.unwrap(); assert_eq!(4, databases_navigation.items.len()); assert_eq!(1, databases_navigation.items.first().unwrap().schemas.len()); - assert_eq!(0, databases_navigation.items.last().unwrap().schemas.len()); + assert_eq!(1, databases_navigation.items.last().unwrap().schemas.len()); let res = req( &client, @@ -167,9 +167,22 @@ async fn test_ui_databases_navigation() { .unwrap(); assert_eq!(http::StatusCode::OK, res.status()); let databases_navigation: NavigationTreesResponse = res.json().await.unwrap(); - assert_eq!( 1, + databases_navigation + .items + .first() + .unwrap() + .schemas + .last() + .unwrap() + .tables + .len() + ); + + // Information schema tables + assert_eq!( + 7, databases_navigation .items .first() @@ -193,11 +206,10 @@ async fn test_ui_databases_navigation() { let databases_navigation: NavigationTreesResponse = res.json().await.unwrap(); assert_eq!(2, databases_navigation.items.len()); assert_eq!("test1", databases_navigation.items.first().unwrap().name); - let cursor = databases_navigation.next_cursor; let res = req( &client, Method::GET, - &format!("{url}?cursor={cursor}"), + &format!("{url}?offset=2"), String::new(), ) .await