diff --git a/native/Cargo.lock b/native/Cargo.lock index 5f99c614b3..5b3f7e885e 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -1885,7 +1885,7 @@ dependencies = [ [[package]] name = "datafusion-comet-common" -version = "0.14.0" +version = "0.15.0" dependencies = [ "arrow", "datafusion", @@ -1911,7 +1911,7 @@ dependencies = [ [[package]] name = "datafusion-comet-jni-bridge" -version = "0.14.0" +version = "0.15.0" dependencies = [ "arrow", "assertables", diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs index d24a6a503e..a59a349bf9 100644 --- a/native/core/src/parquet/mod.rs +++ b/native/core/src/parquet/mod.rs @@ -23,6 +23,7 @@ pub use mutable_vector::*; #[macro_use] pub mod util; pub mod parquet_exec; +pub mod parquet_read_cached_factory; pub mod parquet_support; pub mod read; pub mod schema_adapter; diff --git a/native/core/src/parquet/parquet_exec.rs b/native/core/src/parquet/parquet_exec.rs index 2d970734bb..ef4c878b9a 100644 --- a/native/core/src/parquet/parquet_exec.rs +++ b/native/core/src/parquet/parquet_exec.rs @@ -17,6 +17,7 @@ use crate::execution::operators::ExecutionError; use crate::parquet::encryption_support::{CometEncryptionConfig, ENCRYPTION_FACTORY_ID}; +use crate::parquet::parquet_read_cached_factory::CachingParquetReaderFactory; use crate::parquet::parquet_support::SparkParquetOptions; use crate::parquet::schema_adapter::SparkPhysicalExprAdapterFactory; use arrow::datatypes::{Field, SchemaRef}; @@ -149,6 +150,11 @@ pub(crate) fn init_datasource_exec( ); } + // Use caching reader factory to avoid redundant footer reads across partitions + let store = session_ctx.runtime_env().object_store(&object_store_url)?; + parquet_source = parquet_source + .with_parquet_file_reader_factory(Arc::new(CachingParquetReaderFactory::new(store))); + let expr_adapter_factory: Arc = Arc::new( SparkPhysicalExprAdapterFactory::new(spark_parquet_options, default_values), ); diff --git a/native/core/src/parquet/parquet_read_cached_factory.rs b/native/core/src/parquet/parquet_read_cached_factory.rs new file mode 100644 index 0000000000..f90e53411f --- /dev/null +++ b/native/core/src/parquet/parquet_read_cached_factory.rs @@ -0,0 +1,149 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! A `ParquetFileReaderFactory` that caches parquet footer metadata across +//! partitions within a single scan. When multiple Spark partitions read from +//! the same parquet file (different row group ranges), this avoids redundant +//! footer reads and parsing. +//! +//! The cache is scoped to the factory instance (one per scan), not global, +//! so it does not persist across queries. +//! +//! Uses `tokio::sync::OnceCell` per file path so that concurrent partitions +//! wait for the first reader to load the footer rather than all racing. + +use bytes::Bytes; +use datafusion::common::Result as DataFusionResult; +use datafusion::datasource::physical_plan::parquet::ParquetFileReaderFactory; +use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder}; +use datafusion_datasource::PartitionedFile; +use futures::future::BoxFuture; +use futures::FutureExt; +use object_store::path::Path; +use object_store::ObjectStore; +use parquet::arrow::arrow_reader::ArrowReaderOptions; +use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader}; +use parquet::file::metadata::ParquetMetaData; +use std::collections::HashMap; +use std::fmt::Debug; +use std::ops::Range; +use std::sync::{Arc, Mutex}; +use tokio::sync::OnceCell; + +type MetadataCell = Arc>>; + +/// A `ParquetFileReaderFactory` that caches footer metadata by file path. +/// The cache is scoped to this factory instance (shared across partitions +/// within a single scan via Arc), not global. +#[derive(Debug)] +pub struct CachingParquetReaderFactory { + store: Arc, + cache: Arc>>, +} + +impl CachingParquetReaderFactory { + pub fn new(store: Arc) -> Self { + Self { + store, + cache: Arc::new(Mutex::new(HashMap::new())), + } + } +} + +impl ParquetFileReaderFactory for CachingParquetReaderFactory { + fn create_reader( + &self, + partition_index: usize, + partitioned_file: PartitionedFile, + metadata_size_hint: Option, + metrics: &ExecutionPlanMetricsSet, + ) -> DataFusionResult> { + let bytes_scanned = MetricBuilder::new(metrics).counter("bytes_scanned", partition_index); + + let location = partitioned_file.object_meta.location.clone(); + + // Get or create the OnceCell for this file path + let cell = Arc::>>::clone( + self.cache + .lock() + .unwrap() + .entry(location.clone()) + .or_insert_with(|| Arc::new(OnceCell::new())), + ); + + let mut inner = ParquetObjectReader::new(Arc::clone(&self.store), location.clone()) + .with_file_size(partitioned_file.object_meta.size); + + if let Some(hint) = metadata_size_hint { + inner = inner.with_footer_size_hint(hint); + } + + Ok(Box::new(CachingParquetFileReader { + inner, + location, + cell, + bytes_scanned, + })) + } +} + +struct CachingParquetFileReader { + inner: ParquetObjectReader, + location: Path, + cell: MetadataCell, + bytes_scanned: datafusion::physical_plan::metrics::Count, +} + +impl AsyncFileReader for CachingParquetFileReader { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { + self.bytes_scanned.add((range.end - range.start) as usize); + self.inner.get_bytes(range) + } + + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, parquet::errors::Result>> + where + Self: Send, + { + let total: u64 = ranges.iter().map(|r| r.end - r.start).sum(); + self.bytes_scanned.add(total as usize); + self.inner.get_byte_ranges(ranges) + } + + fn get_metadata<'a>( + &'a mut self, + options: Option<&'a ArrowReaderOptions>, + ) -> BoxFuture<'a, parquet::errors::Result>> { + let cell = Arc::clone(&self.cell); + let location = self.location.clone(); + + async move { + let metadata = cell + .get_or_try_init(|| async { + log::trace!("CachingParquetFileReader: loading footer for {}", location); + self.inner.get_metadata(options).await + }) + .await?; + + log::trace!("CachingParquetFileReader: cache HIT for {}", self.location); + Ok(Arc::clone(metadata)) + } + .boxed() + } +} diff --git a/native/core/src/parquet/schema_adapter.rs b/native/core/src/parquet/schema_adapter.rs index 0ad61df426..535ed70627 100644 --- a/native/core/src/parquet/schema_adapter.rs +++ b/native/core/src/parquet/schema_adapter.rs @@ -66,20 +66,18 @@ fn remap_physical_schema_names( logical_schema: &SchemaRef, physical_schema: &SchemaRef, ) -> SchemaRef { - let logical_names: HashMap = logical_schema - .fields() - .iter() - .map(|f| (f.name().to_lowercase(), f.name().as_str())) - .collect(); - let remapped_fields: Vec<_> = physical_schema .fields() .iter() .map(|field| { - if let Some(logical_name) = logical_names.get(&field.name().to_lowercase()) { - if *logical_name != field.name() { + if let Some(logical_field) = logical_schema + .fields() + .iter() + .find(|lf| lf.name().eq_ignore_ascii_case(field.name())) + { + if logical_field.name() != field.name() { Arc::new(Field::new( - *logical_name, + logical_field.name(), field.data_type().clone(), field.is_nullable(), )) @@ -121,7 +119,7 @@ impl PhysicalExprAdapterFactory for SparkPhysicalExprAdapterFactory { .fields() .iter() .find(|pf| { - pf.name().to_lowercase() == logical_field.name().to_lowercase() + pf.name().eq_ignore_ascii_case(logical_field.name()) && pf.name() != logical_field.name() }) .map(|pf| (logical_field.name().clone(), pf.name().clone())) @@ -264,7 +262,7 @@ impl SparkPhysicalExprAdapter { self.physical_file_schema .fields() .iter() - .find(|f| f.name().to_lowercase() == col_name.to_lowercase()) + .find(|f| f.name().eq_ignore_ascii_case(col_name)) }; if let (Some(logical_field), Some(physical_field)) = (logical_field, physical_field) @@ -530,9 +528,7 @@ mod test { let parquet_exec = DataSourceExec::new(Arc::new(file_scan_config)); - let mut stream = parquet_exec - .execute(0, Arc::new(TaskContext::default())) - .unwrap(); + let mut stream = parquet_exec.execute(0, Arc::new(TaskContext::default()))?; stream.next().await.unwrap() } }