From ba4ebfdce7274b5ff2973a26da145abba23c1c6f Mon Sep 17 00:00:00 2001 From: comphead Date: Sat, 21 Mar 2026 12:36:15 -0700 Subject: [PATCH 1/6] chore: Experiments with `native_datafusion` scan optimizations --- native/Cargo.lock | 4 +- native/core/src/parquet/caching_reader.rs | 158 ++++++++++++++++++++++ native/core/src/parquet/mod.rs | 1 + native/core/src/parquet/parquet_exec.rs | 8 ++ native/core/src/parquet/schema_adapter.rs | 28 ++-- 5 files changed, 186 insertions(+), 13 deletions(-) create mode 100644 native/core/src/parquet/caching_reader.rs diff --git a/native/Cargo.lock b/native/Cargo.lock index 5f99c614b3..5b3f7e885e 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -1885,7 +1885,7 @@ dependencies = [ [[package]] name = "datafusion-comet-common" -version = "0.14.0" +version = "0.15.0" dependencies = [ "arrow", "datafusion", @@ -1911,7 +1911,7 @@ dependencies = [ [[package]] name = "datafusion-comet-jni-bridge" -version = "0.14.0" +version = "0.15.0" dependencies = [ "arrow", "assertables", diff --git a/native/core/src/parquet/caching_reader.rs b/native/core/src/parquet/caching_reader.rs new file mode 100644 index 0000000000..5b498be90a --- /dev/null +++ b/native/core/src/parquet/caching_reader.rs @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! A `ParquetFileReaderFactory` that caches parquet footer metadata across +//! partitions. When multiple Spark partitions read from the same parquet file +//! (different row group ranges), this avoids redundant footer reads and parsing. +//! +//! Uses `tokio::sync::OnceCell` per file path so that concurrent partitions +//! wait for the first reader to load the footer rather than all racing. + +use bytes::Bytes; +use datafusion::common::Result as DataFusionResult; +use datafusion::datasource::physical_plan::parquet::ParquetFileReaderFactory; +use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder}; +use datafusion_datasource::PartitionedFile; +use futures::future::BoxFuture; +use futures::FutureExt; +use object_store::path::Path; +use object_store::ObjectStore; +use parquet::arrow::arrow_reader::ArrowReaderOptions; +use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader}; +use parquet::file::metadata::ParquetMetaData; +use std::collections::HashMap; +use std::fmt::Debug; +use std::ops::Range; +use std::sync::{Arc, Mutex}; +use tokio::sync::OnceCell; + +type MetadataCell = Arc>>; + +/// Global cache: maps file path -> OnceCell that will hold the metadata once loaded. +/// The OnceCell ensures only one task fetches the footer; others await the result. +static METADATA_CACHE: std::sync::LazyLock>> = + std::sync::LazyLock::new(|| Mutex::new(HashMap::new())); + +/// Clears the metadata cache. Call between queries if needed. +pub fn clear_metadata_cache() { + METADATA_CACHE.lock().unwrap().clear(); +} + +/// A `ParquetFileReaderFactory` that caches footer metadata by file path. +#[derive(Debug)] +pub struct CachingParquetReaderFactory { + store: Arc, +} + +impl CachingParquetReaderFactory { + pub fn new(store: Arc) -> Self { + Self { store } + } +} + +impl ParquetFileReaderFactory for CachingParquetReaderFactory { + fn create_reader( + &self, + partition_index: usize, + partitioned_file: PartitionedFile, + metadata_size_hint: Option, + metrics: &ExecutionPlanMetricsSet, + ) -> DataFusionResult> { + let bytes_scanned = + MetricBuilder::new(metrics).counter("bytes_scanned", partition_index); + + let location = partitioned_file.object_meta.location.clone(); + + // Get or create the OnceCell for this file path + let cell = METADATA_CACHE + .lock() + .unwrap() + .entry(location.clone()) + .or_insert_with(|| Arc::new(OnceCell::new())) + .clone(); + + let mut inner = ParquetObjectReader::new(Arc::clone(&self.store), location.clone()) + .with_file_size(partitioned_file.object_meta.size); + + if let Some(hint) = metadata_size_hint { + inner = inner.with_footer_size_hint(hint); + } + + Ok(Box::new(CachingParquetFileReader { + inner, + location, + cell, + bytes_scanned, + })) + } +} + +struct CachingParquetFileReader { + inner: ParquetObjectReader, + location: Path, + cell: MetadataCell, + bytes_scanned: datafusion::physical_plan::metrics::Count, +} + +impl AsyncFileReader for CachingParquetFileReader { + fn get_bytes( + &mut self, + range: Range, + ) -> BoxFuture<'_, parquet::errors::Result> { + self.bytes_scanned.add((range.end - range.start) as usize); + self.inner.get_bytes(range) + } + + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, parquet::errors::Result>> + where + Self: Send, + { + let total: u64 = ranges.iter().map(|r| r.end - r.start).sum(); + self.bytes_scanned.add(total as usize); + self.inner.get_byte_ranges(ranges) + } + + fn get_metadata<'a>( + &'a mut self, + options: Option<&'a ArrowReaderOptions>, + ) -> BoxFuture<'a, parquet::errors::Result>> { + let cell = Arc::clone(&self.cell); + let location = self.location.clone(); + + async move { + let metadata = cell + .get_or_try_init(|| async { + log::trace!( + "CachingParquetFileReader: loading footer for {}", + location + ); + self.inner.get_metadata(options).await + }) + .await?; + + log::trace!( + "CachingParquetFileReader: cache HIT for {}", + self.location + ); + Ok(Arc::clone(metadata)) + } + .boxed() + } +} diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs index d24a6a503e..3f588b1bd6 100644 --- a/native/core/src/parquet/mod.rs +++ b/native/core/src/parquet/mod.rs @@ -22,6 +22,7 @@ pub use mutable_vector::*; #[macro_use] pub mod util; +pub mod caching_reader; pub mod parquet_exec; pub mod parquet_support; pub mod read; diff --git a/native/core/src/parquet/parquet_exec.rs b/native/core/src/parquet/parquet_exec.rs index 2d970734bb..d2b3b104bc 100644 --- a/native/core/src/parquet/parquet_exec.rs +++ b/native/core/src/parquet/parquet_exec.rs @@ -17,6 +17,7 @@ use crate::execution::operators::ExecutionError; use crate::parquet::encryption_support::{CometEncryptionConfig, ENCRYPTION_FACTORY_ID}; +use crate::parquet::caching_reader::CachingParquetReaderFactory; use crate::parquet::parquet_support::SparkParquetOptions; use crate::parquet::schema_adapter::SparkPhysicalExprAdapterFactory; use arrow::datatypes::{Field, SchemaRef}; @@ -149,6 +150,13 @@ pub(crate) fn init_datasource_exec( ); } + // Use caching reader factory to avoid redundant footer reads across partitions + let store = session_ctx + .runtime_env() + .object_store(&object_store_url)?; + parquet_source = parquet_source + .with_parquet_file_reader_factory(Arc::new(CachingParquetReaderFactory::new(store))); + let expr_adapter_factory: Arc = Arc::new( SparkPhysicalExprAdapterFactory::new(spark_parquet_options, default_values), ); diff --git a/native/core/src/parquet/schema_adapter.rs b/native/core/src/parquet/schema_adapter.rs index 0ad61df426..ec818c112f 100644 --- a/native/core/src/parquet/schema_adapter.rs +++ b/native/core/src/parquet/schema_adapter.rs @@ -76,7 +76,8 @@ fn remap_physical_schema_names( .fields() .iter() .map(|field| { - if let Some(logical_name) = logical_names.get(&field.name().to_lowercase()) { + let field_lower = field.name().to_lowercase(); + if let Some(logical_name) = logical_names.get(&field_lower) { if *logical_name != field.name() { Arc::new(Field::new( *logical_name, @@ -113,18 +114,23 @@ impl PhysicalExprAdapterFactory for SparkPhysicalExprAdapterFactory { // schema, which uses the original physical file column names. let (adapted_physical_schema, logical_to_physical_names) = if !self.parquet_options.case_sensitive { + // Pre-compute lowercased physical field names to avoid repeated + // to_lowercase() calls in the O(n*m) matching loop. + let physical_lower: Vec<(String, &str)> = physical_file_schema + .fields() + .iter() + .map(|f| (f.name().to_lowercase(), f.name().as_str())) + .collect(); + let logical_to_physical: HashMap = logical_file_schema .fields() .iter() .filter_map(|logical_field| { - physical_file_schema - .fields() + let logical_lower = logical_field.name().to_lowercase(); + physical_lower .iter() - .find(|pf| { - pf.name().to_lowercase() == logical_field.name().to_lowercase() - && pf.name() != logical_field.name() - }) - .map(|pf| (logical_field.name().clone(), pf.name().clone())) + .find(|(pl, orig)| *pl == logical_lower && *orig != logical_field.name()) + .map(|(_, orig)| (logical_field.name().clone(), orig.to_string())) }) .collect(); let remapped = @@ -261,10 +267,11 @@ impl SparkPhysicalExprAdapter { .iter() .find(|f| f.name() == col_name) } else { + let col_lower = col_name.to_lowercase(); self.physical_file_schema .fields() .iter() - .find(|f| f.name().to_lowercase() == col_name.to_lowercase()) + .find(|f| f.name().to_lowercase() == col_lower) }; if let (Some(logical_field), Some(physical_field)) = (logical_field, physical_field) @@ -531,8 +538,7 @@ mod test { let parquet_exec = DataSourceExec::new(Arc::new(file_scan_config)); let mut stream = parquet_exec - .execute(0, Arc::new(TaskContext::default())) - .unwrap(); + .execute(0, Arc::new(TaskContext::default()))?; stream.next().await.unwrap() } } From c1f171848357c75ae878dce87165f34489d6b1d7 Mon Sep 17 00:00:00 2001 From: comphead Date: Sat, 21 Mar 2026 12:48:02 -0700 Subject: [PATCH 2/6] fmt --- native/core/src/parquet/mod.rs | 2 +- native/core/src/parquet/parquet_exec.rs | 6 +- ...ader.rs => parquet_read_cached_factory.rs} | 30 +++----- native/core/src/parquet/schema_adapter.rs | 72 +++++++++---------- 4 files changed, 50 insertions(+), 60 deletions(-) rename native/core/src/parquet/{caching_reader.rs => parquet_read_cached_factory.rs} (87%) diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs index 3f588b1bd6..a59a349bf9 100644 --- a/native/core/src/parquet/mod.rs +++ b/native/core/src/parquet/mod.rs @@ -22,8 +22,8 @@ pub use mutable_vector::*; #[macro_use] pub mod util; -pub mod caching_reader; pub mod parquet_exec; +pub mod parquet_read_cached_factory; pub mod parquet_support; pub mod read; pub mod schema_adapter; diff --git a/native/core/src/parquet/parquet_exec.rs b/native/core/src/parquet/parquet_exec.rs index d2b3b104bc..ef4c878b9a 100644 --- a/native/core/src/parquet/parquet_exec.rs +++ b/native/core/src/parquet/parquet_exec.rs @@ -17,7 +17,7 @@ use crate::execution::operators::ExecutionError; use crate::parquet::encryption_support::{CometEncryptionConfig, ENCRYPTION_FACTORY_ID}; -use crate::parquet::caching_reader::CachingParquetReaderFactory; +use crate::parquet::parquet_read_cached_factory::CachingParquetReaderFactory; use crate::parquet::parquet_support::SparkParquetOptions; use crate::parquet::schema_adapter::SparkPhysicalExprAdapterFactory; use arrow::datatypes::{Field, SchemaRef}; @@ -151,9 +151,7 @@ pub(crate) fn init_datasource_exec( } // Use caching reader factory to avoid redundant footer reads across partitions - let store = session_ctx - .runtime_env() - .object_store(&object_store_url)?; + let store = session_ctx.runtime_env().object_store(&object_store_url)?; parquet_source = parquet_source .with_parquet_file_reader_factory(Arc::new(CachingParquetReaderFactory::new(store))); diff --git a/native/core/src/parquet/caching_reader.rs b/native/core/src/parquet/parquet_read_cached_factory.rs similarity index 87% rename from native/core/src/parquet/caching_reader.rs rename to native/core/src/parquet/parquet_read_cached_factory.rs index 5b498be90a..bb42b1511e 100644 --- a/native/core/src/parquet/caching_reader.rs +++ b/native/core/src/parquet/parquet_read_cached_factory.rs @@ -73,17 +73,18 @@ impl ParquetFileReaderFactory for CachingParquetReaderFactory { metrics: &ExecutionPlanMetricsSet, ) -> DataFusionResult> { let bytes_scanned = - MetricBuilder::new(metrics).counter("bytes_scanned", partition_index); + MetricBuilder::new(metrics).counter("parquet bytes_scanned", partition_index); let location = partitioned_file.object_meta.location.clone(); // Get or create the OnceCell for this file path - let cell = METADATA_CACHE - .lock() - .unwrap() - .entry(location.clone()) - .or_insert_with(|| Arc::new(OnceCell::new())) - .clone(); + let cell = Arc::>>::clone( + METADATA_CACHE + .lock() + .unwrap() + .entry(location.clone()) + .or_insert_with(|| Arc::new(OnceCell::new())), + ); let mut inner = ParquetObjectReader::new(Arc::clone(&self.store), location.clone()) .with_file_size(partitioned_file.object_meta.size); @@ -109,10 +110,7 @@ struct CachingParquetFileReader { } impl AsyncFileReader for CachingParquetFileReader { - fn get_bytes( - &mut self, - range: Range, - ) -> BoxFuture<'_, parquet::errors::Result> { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { self.bytes_scanned.add((range.end - range.start) as usize); self.inner.get_bytes(range) } @@ -139,18 +137,12 @@ impl AsyncFileReader for CachingParquetFileReader { async move { let metadata = cell .get_or_try_init(|| async { - log::trace!( - "CachingParquetFileReader: loading footer for {}", - location - ); + log::trace!("CachingParquetFileReader: loading footer for {}", location); self.inner.get_metadata(options).await }) .await?; - log::trace!( - "CachingParquetFileReader: cache HIT for {}", - self.location - ); + log::trace!("CachingParquetFileReader: cache HIT for {}", self.location); Ok(Arc::clone(metadata)) } .boxed() diff --git a/native/core/src/parquet/schema_adapter.rs b/native/core/src/parquet/schema_adapter.rs index ec818c112f..ab0d02ddf3 100644 --- a/native/core/src/parquet/schema_adapter.rs +++ b/native/core/src/parquet/schema_adapter.rs @@ -112,40 +112,41 @@ impl PhysicalExprAdapterFactory for SparkPhysicalExprAdapterFactory { // to the original physical names. This is necessary because downstream code // (reassign_expr_columns) looks up columns by name in the actual stream // schema, which uses the original physical file column names. - let (adapted_physical_schema, logical_to_physical_names) = - if !self.parquet_options.case_sensitive { - // Pre-compute lowercased physical field names to avoid repeated - // to_lowercase() calls in the O(n*m) matching loop. - let physical_lower: Vec<(String, &str)> = physical_file_schema - .fields() - .iter() - .map(|f| (f.name().to_lowercase(), f.name().as_str())) - .collect(); - - let logical_to_physical: HashMap = logical_file_schema - .fields() - .iter() - .filter_map(|logical_field| { - let logical_lower = logical_field.name().to_lowercase(); - physical_lower - .iter() - .find(|(pl, orig)| *pl == logical_lower && *orig != logical_field.name()) - .map(|(_, orig)| (logical_field.name().clone(), orig.to_string())) - }) - .collect(); - let remapped = - remap_physical_schema_names(&logical_file_schema, &physical_file_schema); - ( - remapped, - if logical_to_physical.is_empty() { - None - } else { - Some(logical_to_physical) - }, - ) - } else { - (Arc::clone(&physical_file_schema), None) - }; + let (adapted_physical_schema, logical_to_physical_names) = if !self + .parquet_options + .case_sensitive + { + // Pre-compute lowercased physical field names to avoid repeated + // to_lowercase() calls in the O(n*m) matching loop. + let physical_lower: Vec<(String, &str)> = physical_file_schema + .fields() + .iter() + .map(|f| (f.name().to_lowercase(), f.name().as_str())) + .collect(); + + let logical_to_physical: HashMap = logical_file_schema + .fields() + .iter() + .filter_map(|logical_field| { + let logical_lower = logical_field.name().to_lowercase(); + physical_lower + .iter() + .find(|(pl, orig)| *pl == logical_lower && *orig != logical_field.name()) + .map(|(_, orig)| (logical_field.name().clone(), orig.to_string())) + }) + .collect(); + let remapped = remap_physical_schema_names(&logical_file_schema, &physical_file_schema); + ( + remapped, + if logical_to_physical.is_empty() { + None + } else { + Some(logical_to_physical) + }, + ) + } else { + (Arc::clone(&physical_file_schema), None) + }; let default_factory = DefaultPhysicalExprAdapterFactory; let default_adapter = default_factory.create( @@ -537,8 +538,7 @@ mod test { let parquet_exec = DataSourceExec::new(Arc::new(file_scan_config)); - let mut stream = parquet_exec - .execute(0, Arc::new(TaskContext::default()))?; + let mut stream = parquet_exec.execute(0, Arc::new(TaskContext::default()))?; stream.next().await.unwrap() } } From 2d96edaeffdc46d8378929e19c1a4609539701b7 Mon Sep 17 00:00:00 2001 From: comphead Date: Sat, 21 Mar 2026 21:22:59 -0700 Subject: [PATCH 3/6] fmt --- native/core/src/parquet/parquet_read_cached_factory.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/native/core/src/parquet/parquet_read_cached_factory.rs b/native/core/src/parquet/parquet_read_cached_factory.rs index bb42b1511e..948213b8af 100644 --- a/native/core/src/parquet/parquet_read_cached_factory.rs +++ b/native/core/src/parquet/parquet_read_cached_factory.rs @@ -73,7 +73,7 @@ impl ParquetFileReaderFactory for CachingParquetReaderFactory { metrics: &ExecutionPlanMetricsSet, ) -> DataFusionResult> { let bytes_scanned = - MetricBuilder::new(metrics).counter("parquet bytes_scanned", partition_index); + MetricBuilder::new(metrics).counter("bytes_scanned", partition_index); let location = partitioned_file.object_meta.location.clone(); From 052015a74ffed7d3de8a1a7919a3eb16d5e8f964 Mon Sep 17 00:00:00 2001 From: comphead Date: Sat, 21 Mar 2026 21:28:50 -0700 Subject: [PATCH 4/6] fmt --- native/core/src/parquet/parquet_read_cached_factory.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/native/core/src/parquet/parquet_read_cached_factory.rs b/native/core/src/parquet/parquet_read_cached_factory.rs index 948213b8af..fe37805c42 100644 --- a/native/core/src/parquet/parquet_read_cached_factory.rs +++ b/native/core/src/parquet/parquet_read_cached_factory.rs @@ -72,8 +72,7 @@ impl ParquetFileReaderFactory for CachingParquetReaderFactory { metadata_size_hint: Option, metrics: &ExecutionPlanMetricsSet, ) -> DataFusionResult> { - let bytes_scanned = - MetricBuilder::new(metrics).counter("bytes_scanned", partition_index); + let bytes_scanned = MetricBuilder::new(metrics).counter("bytes_scanned", partition_index); let location = partitioned_file.object_meta.location.clone(); From a1198400e013eec8134db2e3621bb2e00f255242 Mon Sep 17 00:00:00 2001 From: comphead Date: Sun, 22 Mar 2026 09:54:25 -0700 Subject: [PATCH 5/6] eq_ignore_ascii_case --- native/core/src/parquet/schema_adapter.rs | 84 ++++++++++------------- 1 file changed, 37 insertions(+), 47 deletions(-) diff --git a/native/core/src/parquet/schema_adapter.rs b/native/core/src/parquet/schema_adapter.rs index ab0d02ddf3..535ed70627 100644 --- a/native/core/src/parquet/schema_adapter.rs +++ b/native/core/src/parquet/schema_adapter.rs @@ -66,21 +66,18 @@ fn remap_physical_schema_names( logical_schema: &SchemaRef, physical_schema: &SchemaRef, ) -> SchemaRef { - let logical_names: HashMap = logical_schema - .fields() - .iter() - .map(|f| (f.name().to_lowercase(), f.name().as_str())) - .collect(); - let remapped_fields: Vec<_> = physical_schema .fields() .iter() .map(|field| { - let field_lower = field.name().to_lowercase(); - if let Some(logical_name) = logical_names.get(&field_lower) { - if *logical_name != field.name() { + if let Some(logical_field) = logical_schema + .fields() + .iter() + .find(|lf| lf.name().eq_ignore_ascii_case(field.name())) + { + if logical_field.name() != field.name() { Arc::new(Field::new( - *logical_name, + logical_field.name(), field.data_type().clone(), field.is_nullable(), )) @@ -112,41 +109,35 @@ impl PhysicalExprAdapterFactory for SparkPhysicalExprAdapterFactory { // to the original physical names. This is necessary because downstream code // (reassign_expr_columns) looks up columns by name in the actual stream // schema, which uses the original physical file column names. - let (adapted_physical_schema, logical_to_physical_names) = if !self - .parquet_options - .case_sensitive - { - // Pre-compute lowercased physical field names to avoid repeated - // to_lowercase() calls in the O(n*m) matching loop. - let physical_lower: Vec<(String, &str)> = physical_file_schema - .fields() - .iter() - .map(|f| (f.name().to_lowercase(), f.name().as_str())) - .collect(); - - let logical_to_physical: HashMap = logical_file_schema - .fields() - .iter() - .filter_map(|logical_field| { - let logical_lower = logical_field.name().to_lowercase(); - physical_lower - .iter() - .find(|(pl, orig)| *pl == logical_lower && *orig != logical_field.name()) - .map(|(_, orig)| (logical_field.name().clone(), orig.to_string())) - }) - .collect(); - let remapped = remap_physical_schema_names(&logical_file_schema, &physical_file_schema); - ( - remapped, - if logical_to_physical.is_empty() { - None - } else { - Some(logical_to_physical) - }, - ) - } else { - (Arc::clone(&physical_file_schema), None) - }; + let (adapted_physical_schema, logical_to_physical_names) = + if !self.parquet_options.case_sensitive { + let logical_to_physical: HashMap = logical_file_schema + .fields() + .iter() + .filter_map(|logical_field| { + physical_file_schema + .fields() + .iter() + .find(|pf| { + pf.name().eq_ignore_ascii_case(logical_field.name()) + && pf.name() != logical_field.name() + }) + .map(|pf| (logical_field.name().clone(), pf.name().clone())) + }) + .collect(); + let remapped = + remap_physical_schema_names(&logical_file_schema, &physical_file_schema); + ( + remapped, + if logical_to_physical.is_empty() { + None + } else { + Some(logical_to_physical) + }, + ) + } else { + (Arc::clone(&physical_file_schema), None) + }; let default_factory = DefaultPhysicalExprAdapterFactory; let default_adapter = default_factory.create( @@ -268,11 +259,10 @@ impl SparkPhysicalExprAdapter { .iter() .find(|f| f.name() == col_name) } else { - let col_lower = col_name.to_lowercase(); self.physical_file_schema .fields() .iter() - .find(|f| f.name().to_lowercase() == col_lower) + .find(|f| f.name().eq_ignore_ascii_case(col_name)) }; if let (Some(logical_field), Some(physical_field)) = (logical_field, physical_field) From bec4fc6a5996fbde13cfea4f3a6ca164776e79bb Mon Sep 17 00:00:00 2001 From: comphead Date: Sun, 22 Mar 2026 12:20:56 -0700 Subject: [PATCH 6/6] make cache instance specific --- .../parquet/parquet_read_cached_factory.rs | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/native/core/src/parquet/parquet_read_cached_factory.rs b/native/core/src/parquet/parquet_read_cached_factory.rs index fe37805c42..f90e53411f 100644 --- a/native/core/src/parquet/parquet_read_cached_factory.rs +++ b/native/core/src/parquet/parquet_read_cached_factory.rs @@ -16,8 +16,12 @@ // under the License. //! A `ParquetFileReaderFactory` that caches parquet footer metadata across -//! partitions. When multiple Spark partitions read from the same parquet file -//! (different row group ranges), this avoids redundant footer reads and parsing. +//! partitions within a single scan. When multiple Spark partitions read from +//! the same parquet file (different row group ranges), this avoids redundant +//! footer reads and parsing. +//! +//! The cache is scoped to the factory instance (one per scan), not global, +//! so it does not persist across queries. //! //! Uses `tokio::sync::OnceCell` per file path so that concurrent partitions //! wait for the first reader to load the footer rather than all racing. @@ -42,25 +46,21 @@ use tokio::sync::OnceCell; type MetadataCell = Arc>>; -/// Global cache: maps file path -> OnceCell that will hold the metadata once loaded. -/// The OnceCell ensures only one task fetches the footer; others await the result. -static METADATA_CACHE: std::sync::LazyLock>> = - std::sync::LazyLock::new(|| Mutex::new(HashMap::new())); - -/// Clears the metadata cache. Call between queries if needed. -pub fn clear_metadata_cache() { - METADATA_CACHE.lock().unwrap().clear(); -} - /// A `ParquetFileReaderFactory` that caches footer metadata by file path. +/// The cache is scoped to this factory instance (shared across partitions +/// within a single scan via Arc), not global. #[derive(Debug)] pub struct CachingParquetReaderFactory { store: Arc, + cache: Arc>>, } impl CachingParquetReaderFactory { pub fn new(store: Arc) -> Self { - Self { store } + Self { + store, + cache: Arc::new(Mutex::new(HashMap::new())), + } } } @@ -78,7 +78,7 @@ impl ParquetFileReaderFactory for CachingParquetReaderFactory { // Get or create the OnceCell for this file path let cell = Arc::>>::clone( - METADATA_CACHE + self.cache .lock() .unwrap() .entry(location.clone())