Skip to content

Commit 910b65c

Browse files
authored
Added CaseInsensitiveSchemaDataSourceExec physical optimizer rule (#74)
* merge * Fix typo * CaseInsensitiveSchema optimizer rule * CaseInsensitiveSchema optimizer rule * Remove case sensitive table provider * Fix tests
1 parent 51d6e6d commit 910b65c

11 files changed

Lines changed: 205 additions & 188 deletions

File tree

Cargo.lock

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,15 +44,15 @@ datafusion-expr = { version = "50.0.0" }
4444
datafusion-functions-json = { git = "https://github.com/Embucket/datafusion-functions-json.git", rev = "439cbd2282504c3ffaf262f1ffdb530a0fb1a151" }
4545
datafusion-macros = { version = "50.0.0" }
4646
datafusion-physical-plan = { version = "50.0.0" }
47-
datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "b61b904066672f4753eea1105a1e7c0e760785be" }
47+
datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "df5ad5825bdb7c65fa3d5ad420e025dfbb51afff" }
4848
futures = { version = "0.3" }
4949
http = "1.2"
5050
http-body-util = "0.1.0"
5151
iceberg = { git = "https://github.com/apache/iceberg-rust.git", rev="7a5ad1fcaf00d4638857812bab788105f6c60573"}
52-
iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "b61b904066672f4753eea1105a1e7c0e760785be" }
53-
iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "b61b904066672f4753eea1105a1e7c0e760785be" }
54-
iceberg-rust-spec = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "b61b904066672f4753eea1105a1e7c0e760785be" }
55-
iceberg-s3tables-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "b61b904066672f4753eea1105a1e7c0e760785be" }
52+
iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "df5ad5825bdb7c65fa3d5ad420e025dfbb51afff" }
53+
iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "df5ad5825bdb7c65fa3d5ad420e025dfbb51afff" }
54+
iceberg-rust-spec = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "df5ad5825bdb7c65fa3d5ad420e025dfbb51afff" }
55+
iceberg-s3tables-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "df5ad5825bdb7c65fa3d5ad420e025dfbb51afff" }
5656
indexmap = "2.7.1"
5757
jsonwebtoken = "9.3.1"
5858
lazy_static = { version = "1.5" }

crates/catalog/src/catalogs/embucket/schema.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use crate::snowflake_table::CaseInsensitiveTable;
21
use crate::{block_in_new_runtime, error};
32
use async_trait::async_trait;
43
use catalog_metastore::error as metastore_error;
@@ -95,10 +94,8 @@ impl SchemaProvider for EmbucketSchema {
9594
.await
9695
.map_err(|e| DataFusionError::External(Box::new(e)))?;
9796
let tabular = IcebergTabular::Table(iceberg_table);
98-
99-
let table_provider: Arc<dyn TableProvider> = Arc::new(CaseInsensitiveTable::new(
100-
Arc::new(IcebergDataFusionTable::new(tabular, None, None, None)),
101-
));
97+
let table_provider: Arc<dyn TableProvider> =
98+
Arc::new(IcebergDataFusionTable::new(tabular, None, None, None));
10299
Ok(Some(table_provider))
103100
}
104101
Ok(None) => Ok(None),

crates/catalog/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ pub mod df_error;
1010
pub mod error;
1111
pub mod information_schema;
1212
pub mod schema;
13-
pub mod snowflake_table;
1413
pub mod table;
1514
pub mod utils;
1615

crates/catalog/src/schema.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use crate::snowflake_table::CaseInsensitiveTable;
21
use crate::table::{CachingTable, IcebergTableBuilder};
32
use crate::{block_in_new_runtime, error};
43
use async_trait::async_trait;
@@ -116,9 +115,8 @@ impl SchemaProvider for CachingSchema {
116115
.await
117116
.context(error::IcebergSnafu)?;
118117
let tabular = IcebergTabular::Table(iceberg_table);
119-
let table_provider: Arc<dyn TableProvider> = Arc::new(CaseInsensitiveTable::new(
120-
Arc::new(DataFusionTable::new(tabular, None, None, None)),
121-
));
118+
let table_provider: Arc<dyn TableProvider> =
119+
Arc::new(DataFusionTable::new(tabular, None, None, None));
122120
Ok(table_provider)
123121
})
124122
.map_err(|err| DataFusionError::External(Box::new(err)))?

crates/catalog/src/snowflake_table.rs

Lines changed: 0 additions & 167 deletions
This file was deleted.

crates/catalog/src/utils.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use datafusion::arrow::datatypes::Schema;
12
use datafusion::catalog::{SchemaProvider, TableProvider};
23
use datafusion_common::Result as DataFusionResult;
34
use futures::stream::{self, StreamExt};
@@ -47,3 +48,17 @@ pub async fn fetch_table_providers(
4748

4849
Ok(tables)
4950
}
51+
52+
#[must_use]
53+
pub fn normalize_schema_case(schema: &Schema) -> Schema {
54+
let fields = schema
55+
.fields()
56+
.iter()
57+
.map(|field| {
58+
let mut cloned = field.as_ref().clone();
59+
cloned.set_name(field.name().to_ascii_lowercase());
60+
cloned
61+
})
62+
.collect::<Vec<_>>();
63+
Schema::new(fields)
64+
}

0 commit comments

Comments
 (0)