diff --git a/crates/runtime/Cargo.toml b/crates/runtime/Cargo.toml index a9dc00293..8d6b3fe2b 100644 --- a/crates/runtime/Cargo.toml +++ b/crates/runtime/Cargo.toml @@ -105,6 +105,7 @@ utoipa-axum = { workspace = true } utoipa-swagger-ui = { workspace = true } uuid = { workspace = true } validator = { version = "0.20.0", features = ["derive"] } +once_cell = { version = "1.20.2" } half = "2.6.0" [lints] diff --git a/crates/runtime/src/execution/catalog/catalog.rs b/crates/runtime/src/execution/catalog/catalog.rs index 9a04792bd..ae83ed200 100644 --- a/crates/runtime/src/execution/catalog/catalog.rs +++ b/crates/runtime/src/execution/catalog/catalog.rs @@ -30,22 +30,38 @@ impl CatalogProvider for CachingCatalog { fn schema_names(&self) -> Vec { if self.schemas_cache.is_empty() { - // Fallback to the original catalog schema names if the cache is empty - self.catalog.schema_names() - } else { - self.schemas_cache - .iter() - .map(|entry| entry.key().clone()) - .collect() + for name in self.catalog.schema_names() { + if let Some(schema) = self.catalog.schema(&name) { + self.schemas_cache.insert( + name.clone(), + Arc::new(CachingSchema { + name, + schema, + tables_cache: DashMap::new(), + }), + ); + } + } } + self.schemas_cache.iter().map(|e| e.key().clone()).collect() } #[allow(clippy::as_conversions)] fn schema(&self, name: &str) -> Option> { - self.schemas_cache - .get(name) - .map(|schema| Arc::clone(schema.value()) as Arc) - // Fallback to the original catalog schema if the cache is empty - .or_else(|| self.catalog.schema(name)) + if let Some(schema) = self.schemas_cache.get(name) { + Some(Arc::clone(schema.value()) as Arc) + } else if let Some(schema) = self.catalog.schema(name) { + let caching_schema = Arc::new(CachingSchema { + name: name.to_string(), + schema: Arc::clone(&schema), + tables_cache: DashMap::new(), + }); + + self.schemas_cache + .insert(name.to_string(), Arc::clone(&caching_schema)); + Some(caching_schema as Arc) + } else { + None + } } } diff --git a/crates/runtime/src/execution/catalog/catalog_list.rs b/crates/runtime/src/execution/catalog/catalog_list.rs index 545149557..2b24a7999 100644 --- a/crates/runtime/src/execution/catalog/catalog_list.rs +++ b/crates/runtime/src/execution/catalog/catalog_list.rs @@ -183,11 +183,11 @@ impl EmbucketCatalogList { { schema.tables_cache.insert( table.clone(), - Arc::new(CachingTable { - name: table.to_string(), - schema: Some(table_provider.schema()), - table: Arc::clone(&table_provider), - }), + Arc::new(CachingTable::new_with_schema( + table, + table_provider.schema(), + Arc::clone(&table_provider), + )), ); } } diff --git a/crates/runtime/src/execution/catalog/schema.rs b/crates/runtime/src/execution/catalog/schema.rs index 23091b0e5..57d33e141 100644 --- a/crates/runtime/src/execution/catalog/schema.rs +++ b/crates/runtime/src/execution/catalog/schema.rs @@ -18,7 +18,7 @@ impl std::fmt::Debug for CachingSchema { f.debug_struct("Schema") .field("schema", &"") .field("name", &self.name) - .field("tables_cache", &"") + .field("tables_cache", &self.tables_cache) .finish() } } @@ -33,6 +33,7 @@ impl SchemaProvider for CachingSchema { if self.tables_cache.is_empty() { self.schema.table_names() } else { + // Don't fill the cache since should call async table() to fill it self.tables_cache .iter() .map(|entry| entry.key().clone()) @@ -47,11 +48,8 @@ impl SchemaProvider for CachingSchema { } else { // Fallback to the original schema table if the cache is empty if let Some(table) = self.schema.table(name).await? { - let caching_table = Arc::new(CachingTable { - name: name.to_string(), - schema: Some(table.schema()), - table: Arc::clone(&table), - }); + let caching_table = + Arc::new(CachingTable::new(name.to_string(), Arc::clone(&table))); // Insert into cache self.tables_cache @@ -64,6 +62,24 @@ impl SchemaProvider for CachingSchema { } } + fn register_table( + &self, + name: String, + table: Arc, + ) -> datafusion_common::Result>> { + let caching_table = Arc::new(CachingTable::new(name.clone(), Arc::clone(&table))); + self.tables_cache.insert(name.clone(), caching_table); + self.schema.register_table(name, table) + } + + fn deregister_table( + &self, + name: &str, + ) -> datafusion_common::Result>> { + self.tables_cache.remove(name); + self.schema.deregister_table(name) + } + fn table_exist(&self, name: &str) -> bool { self.tables_cache.contains_key(name) } diff --git a/crates/runtime/src/execution/catalog/table.rs b/crates/runtime/src/execution/catalog/table.rs index d5168b8f5..f42213d6f 100644 --- a/crates/runtime/src/execution/catalog/table.rs +++ b/crates/runtime/src/execution/catalog/table.rs @@ -4,15 +4,33 @@ use datafusion::catalog::{Session, TableProvider}; use datafusion_expr::dml::InsertOp; use datafusion_expr::{Expr, TableType}; use datafusion_physical_plan::ExecutionPlan; +use once_cell::sync::OnceCell; use std::any::Any; use std::sync::Arc; pub struct CachingTable { - pub schema: Option, + pub schema: OnceCell, pub name: String, pub table: Arc, } +impl CachingTable { + pub fn new(name: String, table: Arc) -> Self { + Self { + schema: OnceCell::new(), + name, + table, + } + } + pub fn new_with_schema(name: String, schema: SchemaRef, table: Arc) -> Self { + Self { + schema: OnceCell::from(schema), + name, + table, + } + } +} + impl std::fmt::Debug for CachingTable { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Table") @@ -30,10 +48,7 @@ impl TableProvider for CachingTable { } fn schema(&self) -> SchemaRef { - match &self.schema { - Some(schema) => Arc::clone(schema), - None => self.table.schema(), - } + self.schema.get_or_init(|| self.table.schema()).clone() } fn table_type(&self) -> TableType { diff --git a/crates/runtime/src/execution/query.rs b/crates/runtime/src/execution/query.rs index dfdd20abd..359761861 100644 --- a/crates/runtime/src/execution/query.rs +++ b/crates/runtime/src/execution/query.rs @@ -1141,35 +1141,6 @@ impl UserQuery { } } } - // Unwraps are allowed here because we are sure that objects exists - for catalog in self.session.ctx.state().catalog_list().catalog_names() { - let provider = self - .session - .ctx - .state() - .catalog_list() - .catalog(&catalog) - .unwrap(); - for schema in provider.schema_names() { - for table in provider.schema(&schema).unwrap().table_names() { - let table_source = provider - .schema(&schema) - .unwrap() - .table(&table) - .await - .context(super::error::DataFusionSnafu)? - .ok_or(ExecutionError::TableProviderNotFound { - table_name: table.clone(), - })?; - let resolved = state.resolve_table_ref(TableReference::full( - catalog.to_string(), - schema.to_string(), - table, - )); - tables.insert(resolved, provider_as_source(table_source)); - } - } - } Ok(tables) } // TODO: We need to recursively fix any missing table references with the default diff --git a/crates/runtime/src/execution/session.rs b/crates/runtime/src/execution/session.rs index eb2cd3e40..982e7a4ed 100644 --- a/crates/runtime/src/execution/session.rs +++ b/crates/runtime/src/execution/session.rs @@ -29,7 +29,7 @@ use iceberg_rust::object_store::ObjectStoreBuilder; use iceberg_s3tables_catalog::S3TablesCatalog; use snafu::ResultExt; use std::any::Any; -use std::collections::{BTreeMap, HashMap}; +use std::collections::HashMap; use std::env; use std::sync::Arc; @@ -40,8 +40,6 @@ pub struct UserSession { pub executor: DedicatedExecutor, } -pub type CatalogsTree = BTreeMap>>; - impl UserSession { pub async fn new(metastore: Arc) -> ExecutionResult { let sql_parser_dialect = @@ -155,41 +153,6 @@ impl UserSession { UserQuery::new(self.clone(), query.into(), query_context) } - /// Returns a tree of available catalogs, schemas, and table names. - /// - /// The structure of the result: - /// - /// ```json - /// { - /// "catalog1": { - /// "schema1": ["table1", "table2"], - /// "schema2": ["table3"] - /// }, - /// "catalog2": { ... } - /// } - /// ``` - /// - /// Note: Only table names are retrieved — the actual table metadata is not loaded. - /// - /// Returns a [`CatalogsTree`] mapping catalog names to schemas and their tables. - #[must_use] - pub fn fetch_catalogs_tree(&self) -> CatalogsTree { - let mut tree: CatalogsTree = BTreeMap::new(); - - for catalog_name in self.ctx.catalog_names() { - if let Some(catalog) = self.ctx.catalog(&catalog_name) { - let mut schemas = BTreeMap::new(); - for schema_name in catalog.schema_names() { - if let Some(schema) = catalog.schema(&schema_name) { - schemas.insert(schema_name, schema.table_names()); - } - } - tree.insert(catalog_name, schemas); - } - } - tree - } - pub fn set_session_variable( &self, set: bool, diff --git a/crates/runtime/src/http/ui/navigation_trees/error.rs b/crates/runtime/src/http/ui/navigation_trees/error.rs index 41c853977..e9ea727a2 100644 --- a/crates/runtime/src/http/ui/navigation_trees/error.rs +++ b/crates/runtime/src/http/ui/navigation_trees/error.rs @@ -1,3 +1,4 @@ +use crate::execution::error::ExecutionError; use crate::http::error::ErrorResponse; use crate::http::ui::error::IntoStatusCode; use axum::response::IntoResponse; @@ -13,13 +14,16 @@ pub type NavigationTreesResult = Result; pub enum NavigationTreesAPIError { #[snafu(display("Get navigation trees error: {source}"))] Get { source: MetastoreError }, + + #[snafu(display("Execution error: {source}"))] + Execution { source: ExecutionError }, } // Select which status code to return. impl IntoStatusCode for NavigationTreesAPIError { fn status_code(&self) -> StatusCode { match self { - Self::Get { .. } => StatusCode::INTERNAL_SERVER_ERROR, + Self::Get { .. } | Self::Execution { .. } => StatusCode::INTERNAL_SERVER_ERROR, } } } diff --git a/crates/runtime/src/http/ui/navigation_trees/handlers.rs b/crates/runtime/src/http/ui/navigation_trees/handlers.rs index c0390bd95..92b0afcef 100644 --- a/crates/runtime/src/http/ui/navigation_trees/handlers.rs +++ b/crates/runtime/src/http/ui/navigation_trees/handlers.rs @@ -1,14 +1,18 @@ +use crate::execution::error::ExecutionError; +use crate::execution::query::QueryContext; use crate::http::error::ErrorResponse; +use crate::http::session::DFSessionId; use crate::http::state::AppState; use crate::http::ui::navigation_trees::error::{NavigationTreesAPIError, NavigationTreesResult}; use crate::http::ui::navigation_trees::models::{ NavigationTreeDatabase, NavigationTreeSchema, NavigationTreeTable, NavigationTreesParameters, NavigationTreesResponse, }; +use arrow_array::{RecordBatch, StringArray}; use axum::extract::Query; use axum::{extract::State, Json}; -use embucket_metastore::error::MetastoreError; -use embucket_utils::scan_iterator::ScanIterator; +use datafusion_common::DataFusionError; +use std::collections::BTreeMap; use utoipa::OpenApi; #[derive(OpenApi)] @@ -35,8 +39,8 @@ pub struct ApiDoc; get, operation_id = "getNavigationTrees", params( - ("cursor" = Option, Query, description = "Navigation trees cursor"), - ("limit" = Option, Query, description = "Navigation trees limit"), + ("offset" = Option, Query, description = "Navigation trees offset"), + ("limit" = Option, Query, description = "Navigation trees limit"), ), tags = ["navigation-trees"], path = "/ui/navigation-trees", @@ -45,69 +49,77 @@ pub struct ApiDoc; (status = 500, description = "Internal server error", body = ErrorResponse) ) )] -#[tracing::instrument(level = "debug", skip(state), err, ret(level = tracing::Level::TRACE))] pub async fn get_navigation_trees( - Query(parameters): Query, + DFSessionId(session_id): DFSessionId, + Query(params): Query, State(state): State, ) -> NavigationTreesResult> { - let rw_databases = state - .metastore - .iter_databases() - .cursor(parameters.cursor.clone()) - .limit(parameters.limit) - .collect() + let (batches, _) = state + .execution_svc + .query( + &session_id, + "SELECT table_catalog, table_schema, table_name FROM information_schema.tables", + QueryContext::default(), + ) .await - .map_err(|e| NavigationTreesAPIError::Get { - source: MetastoreError::UtilSlateDB { source: e }, - })?; + .map_err(|e| NavigationTreesAPIError::Execution { source: e })?; - let next_cursor = rw_databases - .iter() - .last() - .map_or(String::new(), |rw_object| rw_object.ident.clone()); + let mut catalogs_tree: BTreeMap>> = BTreeMap::new(); - let mut databases: Vec = vec![]; - for rw_database in rw_databases { - let rw_schemas = state - .metastore - .iter_schemas(&rw_database.ident.clone()) - .collect() - .await - .map_err(|e| NavigationTreesAPIError::Get { - source: MetastoreError::UtilSlateDB { source: e }, - })?; + for batch in batches { + let catalog_col = downcast_string_column(&batch, "table_catalog")?; + let schema_col = downcast_string_column(&batch, "table_schema")?; + let name_col = downcast_string_column(&batch, "table_name")?; - let mut schemas: Vec = vec![]; - for rw_schema in rw_schemas { - let rw_tables = state - .metastore - .iter_tables(&rw_schema.ident) - .collect() - .await - .map_err(|e| NavigationTreesAPIError::Get { - source: MetastoreError::UtilSlateDB { source: e }, - })?; + for i in 0..batch.num_rows() { + let catalog = catalog_col.value(i).to_string(); + let schema = schema_col.value(i).to_string(); + let name = name_col.value(i).to_string(); - let mut tables: Vec = vec![]; - for rw_table in rw_tables { - tables.push(NavigationTreeTable { - name: rw_table.ident.table.clone(), - }); - } - schemas.push(NavigationTreeSchema { - name: rw_schema.ident.schema.clone(), - tables, - }); + catalogs_tree + .entry(catalog) + .or_default() + .entry(schema) + .or_default() + .push(name); } - databases.push(NavigationTreeDatabase { - name: rw_database.ident.clone(), - schemas, - }); } - Ok(Json(NavigationTreesResponse { - items: databases, - current_cursor: parameters.cursor, - next_cursor, - })) + let offset = params.offset.unwrap_or(0); + let limit = params.limit.map_or(usize::MAX, usize::from); + + let items = catalogs_tree + .into_iter() + .skip(offset) + .take(limit) + .map(|(catalog_name, schemas_map)| NavigationTreeDatabase { + name: catalog_name, + schemas: schemas_map + .into_iter() + .map(|(schema_name, table_names)| NavigationTreeSchema { + name: schema_name, + tables: table_names + .into_iter() + .map(|name| NavigationTreeTable { name }) + .collect(), + }) + .collect(), + }) + .collect(); + + Ok(Json(NavigationTreesResponse { items })) +} + +fn downcast_string_column<'a>( + batch: &'a RecordBatch, + name: &str, +) -> Result<&'a StringArray, NavigationTreesAPIError> { + batch + .column_by_name(name) + .and_then(|col| col.as_any().downcast_ref::()) + .ok_or_else(|| NavigationTreesAPIError::Execution { + source: ExecutionError::DataFusion { + source: DataFusionError::Internal(format!("Missing or invalid column: '{name}'")), + }, + }) } diff --git a/crates/runtime/src/http/ui/navigation_trees/models.rs b/crates/runtime/src/http/ui/navigation_trees/models.rs index d354ac075..4ff528f9d 100644 --- a/crates/runtime/src/http/ui/navigation_trees/models.rs +++ b/crates/runtime/src/http/ui/navigation_trees/models.rs @@ -7,8 +7,6 @@ use validator::Validate; #[serde(rename_all = "camelCase")] pub struct NavigationTreesResponse { pub items: Vec, - pub current_cursor: Option, - pub next_cursor: String, } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Validate, ToSchema)] @@ -33,7 +31,7 @@ pub struct NavigationTreeTable { #[derive(Debug, Deserialize, ToSchema, IntoParams)] pub struct NavigationTreesParameters { - pub cursor: Option, + pub offset: Option, #[serde(default = "default_limit")] pub limit: Option, } diff --git a/crates/runtime/src/http/ui/tables/error.rs b/crates/runtime/src/http/ui/tables/error.rs index 4790e7ac6..8c5438400 100644 --- a/crates/runtime/src/http/ui/tables/error.rs +++ b/crates/runtime/src/http/ui/tables/error.rs @@ -34,8 +34,8 @@ pub enum TableError { pub enum TablesAPIError { #[snafu(display("Create table error: {source}"))] CreateUpload { source: TableError }, - #[snafu(display("Get table error: {source}"))] - GetExecution { source: ExecutionError }, + #[snafu(display("Execution error: {source}"))] + Execution { source: ExecutionError }, #[snafu(display("Get table error: {source}"))] GetMetastore { source: MetastoreError }, } @@ -60,7 +60,7 @@ impl IntoStatusCode for TablesAPIError { } => StatusCode::UNPROCESSABLE_ENTITY, _ => StatusCode::INTERNAL_SERVER_ERROR, }, - Self::GetExecution { source } => match &source { + Self::Execution { source } => match &source { ExecutionError::TableNotFound { .. } => StatusCode::NOT_FOUND, _ => StatusCode::INTERNAL_SERVER_ERROR, }, diff --git a/crates/runtime/src/http/ui/tables/handlers.rs b/crates/runtime/src/http/ui/tables/handlers.rs index f3d438e89..4c749e8ba 100644 --- a/crates/runtime/src/http/ui/tables/handlers.rs +++ b/crates/runtime/src/http/ui/tables/handlers.rs @@ -11,7 +11,7 @@ use crate::http::ui::tables::models::{ TablePreviewDataResponse, TablePreviewDataRow, TableStatistics, TableStatisticsResponse, TableUploadPayload, TableUploadResponse, TablesParameters, TablesResponse, UploadParameters, }; -use arrow_array::{Array, StringArray}; +use arrow::util::display::array_value_to_string; use axum::extract::Query; use axum::{ extract::{Multipart, Path, State}, @@ -140,15 +140,15 @@ pub async fn get_table_statistics( pub async fn get_table_columns( DFSessionId(session_id): DFSessionId, State(state): State, - Path((database_name, schema_name, table_name)): Path<(String, String, String)>, + Path((database, schema, table)): Path<(String, String, String)>, ) -> TablesResult> { - let context = QueryContext::new(Some(database_name.clone()), Some(schema_name.clone()), None); - let sql_string = format!("SELECT * FROM {database_name}.{schema_name}.{table_name} LIMIT 0"); + let context = QueryContext::new(Some(database.clone()), Some(schema.clone()), None); + let sql_string = format!("SELECT * FROM {database}.{schema}.{table} LIMIT 0"); let (_, column_infos) = state .execution_svc .query(&session_id, sql_string.as_str(), context) .await - .map_err(|e| TablesAPIError::GetExecution { source: e })?; + .map_err(|e| TablesAPIError::Execution { source: e })?; let items: Vec = column_infos .iter() .map(|column_info| TableColumn { @@ -193,39 +193,10 @@ pub async fn get_table_preview_data( DFSessionId(session_id): DFSessionId, Query(parameters): Query, State(state): State, - Path((database_name, schema_name, table_name)): Path<(String, String, String)>, + Path((database, schema, table)): Path<(String, String, String)>, ) -> TablesResult> { - let context = QueryContext::new(Some(database_name.clone()), Some(schema_name.clone()), None); - let ident = MetastoreTableIdent::new(&database_name, &schema_name, &table_name); - let column_names = match state.metastore.get_table(&ident).await { - Ok(Some(rw_object)) => { - if let Ok(schema) = rw_object.metadata.current_schema(None) { - let items: Vec = schema.iter().map(|field| field.name.clone()).collect(); - Ok(items) - } else { - Ok(vec![]) - } - } - Ok(None) => Err(TablesAPIError::GetMetastore { - source: MetastoreError::TableNotFound { - table: database_name.clone(), - schema: schema_name.clone(), - db: table_name.clone(), - }, - }), - Err(e) => Err(TablesAPIError::GetMetastore { source: e }), - }?; - let column_names = column_names - .iter() - //UNSUPPORTED TYPES: ListArray, StructArray, Binary (Arrow Cast for Datafusion) - .map(|column_name| { - format!("COALESCE(CAST({column_name} AS STRING), 'Unsupported') AS {column_name}") - }) - .collect::>(); - let sql_string = format!( - "SELECT {} FROM {database_name}.{schema_name}.{table_name}", - column_names.join(", ") - ); + let context = QueryContext::new(Some(database.clone()), Some(schema.clone()), None); + let sql_string = format!("SELECT * FROM {database}.{schema}.{table}"); let sql_string = parameters.offset.map_or(sql_string.clone(), |offset| { format!("{sql_string} OFFSET {offset}") }); @@ -236,21 +207,26 @@ pub async fn get_table_preview_data( .execution_svc .query(&session_id, sql_string.as_str(), context) .await - .map_err(|e| TablesAPIError::GetExecution { source: e })?; - let mut preview_data_columns: Vec = vec![]; + .map_err(|e| TablesAPIError::Execution { source: e })?; + + let mut preview_data_columns = Vec::new(); for batch in &batches { + let schema = batch.schema(); + let num_rows = batch.num_rows(); + for (i, column) in batch.columns().iter().enumerate() { - let preview_data_rows: Vec = column - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .map(|row| TablePreviewDataRow { - data: row.unwrap().to_string(), + let array = column.as_ref(); + + let preview_data_rows: Vec = (0..num_rows) + .map(|row_index| { + let data = array_value_to_string(array, row_index) + .unwrap_or_else(|_| "ERROR".to_string()); + TablePreviewDataRow { data } }) .collect(); + preview_data_columns.push(TablePreviewDataColumn { - name: batch.schema().fields[i].name().to_string(), + name: schema.field(i).name().to_string(), rows: preview_data_rows, }); } diff --git a/crates/runtime/src/http/ui/tests/navigation_trees.rs b/crates/runtime/src/http/ui/tests/navigation_trees.rs index 9d54f8842..3561e7a70 100644 --- a/crates/runtime/src/http/ui/tests/navigation_trees.rs +++ b/crates/runtime/src/http/ui/tests/navigation_trees.rs @@ -113,7 +113,7 @@ async fn test_ui_databases_navigation() { let databases_navigation: NavigationTreesResponse = res.json().await.unwrap(); assert_eq!(4, databases_navigation.items.len()); assert_eq!(1, databases_navigation.items.first().unwrap().schemas.len()); - assert_eq!(0, databases_navigation.items.last().unwrap().schemas.len()); + assert_eq!(1, databases_navigation.items.last().unwrap().schemas.len()); let res = req( &client, @@ -167,9 +167,22 @@ async fn test_ui_databases_navigation() { .unwrap(); assert_eq!(http::StatusCode::OK, res.status()); let databases_navigation: NavigationTreesResponse = res.json().await.unwrap(); - assert_eq!( 1, + databases_navigation + .items + .first() + .unwrap() + .schemas + .last() + .unwrap() + .tables + .len() + ); + + // Information schema tables + assert_eq!( + 7, databases_navigation .items .first() @@ -193,11 +206,10 @@ async fn test_ui_databases_navigation() { let databases_navigation: NavigationTreesResponse = res.json().await.unwrap(); assert_eq!(2, databases_navigation.items.len()); assert_eq!("test1", databases_navigation.items.first().unwrap().name); - let cursor = databases_navigation.next_cursor; let res = req( &client, Method::GET, - &format!("{url}?cursor={cursor}"), + &format!("{url}?offset=2"), String::new(), ) .await