Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ utoipa-axum = { workspace = true }
utoipa-swagger-ui = { workspace = true }
uuid = { workspace = true }
validator = { version = "0.20.0", features = ["derive"] }
once_cell = { version = "1.20.2" }
half = "2.6.0"

[lints]
Expand Down
40 changes: 28 additions & 12 deletions crates/runtime/src/execution/catalog/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,38 @@ impl CatalogProvider for CachingCatalog {

fn schema_names(&self) -> Vec<String> {
if self.schemas_cache.is_empty() {
// Fallback to the original catalog schema names if the cache is empty
self.catalog.schema_names()
} else {
self.schemas_cache
.iter()
.map(|entry| entry.key().clone())
.collect()
for name in self.catalog.schema_names() {
if let Some(schema) = self.catalog.schema(&name) {
self.schemas_cache.insert(
name.clone(),
Arc::new(CachingSchema {
name,
schema,
tables_cache: DashMap::new(),
}),
);
}
}
}
self.schemas_cache.iter().map(|e| e.key().clone()).collect()
}

#[allow(clippy::as_conversions)]
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
self.schemas_cache
.get(name)
.map(|schema| Arc::clone(schema.value()) as Arc<dyn SchemaProvider>)
// Fallback to the original catalog schema if the cache is empty
.or_else(|| self.catalog.schema(name))
if let Some(schema) = self.schemas_cache.get(name) {
Some(Arc::clone(schema.value()) as Arc<dyn SchemaProvider>)
} else if let Some(schema) = self.catalog.schema(name) {
let caching_schema = Arc::new(CachingSchema {
name: name.to_string(),
schema: Arc::clone(&schema),
tables_cache: DashMap::new(),
});

self.schemas_cache
.insert(name.to_string(), Arc::clone(&caching_schema));
Some(caching_schema as Arc<dyn SchemaProvider>)
} else {
None
}
}
}
10 changes: 5 additions & 5 deletions crates/runtime/src/execution/catalog/catalog_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,11 @@ impl EmbucketCatalogList {
{
schema.tables_cache.insert(
table.clone(),
Arc::new(CachingTable {
name: table.to_string(),
schema: Some(table_provider.schema()),
table: Arc::clone(&table_provider),
}),
Arc::new(CachingTable::new_with_schema(
table,
table_provider.schema(),
Arc::clone(&table_provider),
)),
);
}
}
Expand Down
28 changes: 22 additions & 6 deletions crates/runtime/src/execution/catalog/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl std::fmt::Debug for CachingSchema {
f.debug_struct("Schema")
.field("schema", &"")
.field("name", &self.name)
.field("tables_cache", &"")
.field("tables_cache", &self.tables_cache)
.finish()
}
}
Expand All @@ -33,6 +33,7 @@ impl SchemaProvider for CachingSchema {
if self.tables_cache.is_empty() {
self.schema.table_names()
} else {
// Don't fill the cache since should call async table() to fill it
self.tables_cache
.iter()
.map(|entry| entry.key().clone())
Expand All @@ -47,11 +48,8 @@ impl SchemaProvider for CachingSchema {
} else {
// Fallback to the original schema table if the cache is empty
if let Some(table) = self.schema.table(name).await? {
let caching_table = Arc::new(CachingTable {
name: name.to_string(),
schema: Some(table.schema()),
table: Arc::clone(&table),
});
let caching_table =
Arc::new(CachingTable::new(name.to_string(), Arc::clone(&table)));

// Insert into cache
self.tables_cache
Expand All @@ -64,6 +62,24 @@ impl SchemaProvider for CachingSchema {
}
}

fn register_table(
&self,
name: String,
table: Arc<dyn TableProvider>,
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
let caching_table = Arc::new(CachingTable::new(name.clone(), Arc::clone(&table)));
self.tables_cache.insert(name.clone(), caching_table);
self.schema.register_table(name, table)
}

fn deregister_table(
&self,
name: &str,
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
self.tables_cache.remove(name);
self.schema.deregister_table(name)
}

fn table_exist(&self, name: &str) -> bool {
self.tables_cache.contains_key(name)
}
Expand Down
25 changes: 20 additions & 5 deletions crates/runtime/src/execution/catalog/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,33 @@ use datafusion::catalog::{Session, TableProvider};
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{Expr, TableType};
use datafusion_physical_plan::ExecutionPlan;
use once_cell::sync::OnceCell;
use std::any::Any;
use std::sync::Arc;

pub struct CachingTable {
pub schema: Option<SchemaRef>,
pub schema: OnceCell<SchemaRef>,
pub name: String,
pub table: Arc<dyn TableProvider>,
}

impl CachingTable {
pub fn new(name: String, table: Arc<dyn TableProvider>) -> Self {
Self {
schema: OnceCell::new(),
name,
table,
}
}
pub fn new_with_schema(name: String, schema: SchemaRef, table: Arc<dyn TableProvider>) -> Self {
Self {
schema: OnceCell::from(schema),
name,
table,
}
}
}

impl std::fmt::Debug for CachingTable {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Table")
Expand All @@ -30,10 +48,7 @@ impl TableProvider for CachingTable {
}

fn schema(&self) -> SchemaRef {
match &self.schema {
Some(schema) => Arc::clone(schema),
None => self.table.schema(),
}
self.schema.get_or_init(|| self.table.schema()).clone()
}

fn table_type(&self) -> TableType {
Expand Down
29 changes: 0 additions & 29 deletions crates/runtime/src/execution/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1141,35 +1141,6 @@ impl UserQuery {
}
}
}
// Unwraps are allowed here because we are sure that objects exists
for catalog in self.session.ctx.state().catalog_list().catalog_names() {
let provider = self
.session
.ctx
.state()
.catalog_list()
.catalog(&catalog)
.unwrap();
for schema in provider.schema_names() {
for table in provider.schema(&schema).unwrap().table_names() {
let table_source = provider
.schema(&schema)
.unwrap()
.table(&table)
.await
.context(super::error::DataFusionSnafu)?
.ok_or(ExecutionError::TableProviderNotFound {
table_name: table.clone(),
})?;
let resolved = state.resolve_table_ref(TableReference::full(
catalog.to_string(),
schema.to_string(),
table,
));
tables.insert(resolved, provider_as_source(table_source));
}
}
}
Ok(tables)
}
// TODO: We need to recursively fix any missing table references with the default
Expand Down
39 changes: 1 addition & 38 deletions crates/runtime/src/execution/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use iceberg_rust::object_store::ObjectStoreBuilder;
use iceberg_s3tables_catalog::S3TablesCatalog;
use snafu::ResultExt;
use std::any::Any;
use std::collections::{BTreeMap, HashMap};
use std::collections::HashMap;
use std::env;
use std::sync::Arc;

Expand All @@ -40,8 +40,6 @@ pub struct UserSession {
pub executor: DedicatedExecutor,
}

pub type CatalogsTree = BTreeMap<String, BTreeMap<String, Vec<String>>>;

impl UserSession {
pub async fn new(metastore: Arc<dyn Metastore>) -> ExecutionResult<Self> {
let sql_parser_dialect =
Expand Down Expand Up @@ -155,41 +153,6 @@ impl UserSession {
UserQuery::new(self.clone(), query.into(), query_context)
}

/// Returns a tree of available catalogs, schemas, and table names.
///
/// The structure of the result:
///
/// ```json
/// {
/// "catalog1": {
/// "schema1": ["table1", "table2"],
/// "schema2": ["table3"]
/// },
/// "catalog2": { ... }
/// }
/// ```
///
/// Note: Only table names are retrieved — the actual table metadata is not loaded.
///
/// Returns a [`CatalogsTree`] mapping catalog names to schemas and their tables.
#[must_use]
pub fn fetch_catalogs_tree(&self) -> CatalogsTree {
let mut tree: CatalogsTree = BTreeMap::new();

for catalog_name in self.ctx.catalog_names() {
if let Some(catalog) = self.ctx.catalog(&catalog_name) {
let mut schemas = BTreeMap::new();
for schema_name in catalog.schema_names() {
if let Some(schema) = catalog.schema(&schema_name) {
schemas.insert(schema_name, schema.table_names());
}
}
tree.insert(catalog_name, schemas);
}
}
tree
}

pub fn set_session_variable(
&self,
set: bool,
Expand Down
6 changes: 5 additions & 1 deletion crates/runtime/src/http/ui/navigation_trees/error.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::execution::error::ExecutionError;
use crate::http::error::ErrorResponse;
use crate::http::ui::error::IntoStatusCode;
use axum::response::IntoResponse;
Expand All @@ -13,13 +14,16 @@ pub type NavigationTreesResult<T> = Result<T, NavigationTreesAPIError>;
pub enum NavigationTreesAPIError {
#[snafu(display("Get navigation trees error: {source}"))]
Get { source: MetastoreError },

#[snafu(display("Execution error: {source}"))]
Execution { source: ExecutionError },
}

// Select which status code to return.
impl IntoStatusCode for NavigationTreesAPIError {
fn status_code(&self) -> StatusCode {
match self {
Self::Get { .. } => StatusCode::INTERNAL_SERVER_ERROR,
Self::Get { .. } | Self::Execution { .. } => StatusCode::INTERNAL_SERVER_ERROR,
}
}
}
Expand Down
Loading