From 8e31e8b7d339de28f24722c9b82655ed9232a26a Mon Sep 17 00:00:00 2001 From: Anant Vindal Date: Mon, 2 Mar 2026 17:19:15 +0530 Subject: [PATCH 1/2] fix: proper listing of hottier --- src/hottier.rs | 85 ++++++++++++++++++----------- src/query/stream_schema_provider.rs | 10 +--- 2 files changed, 56 insertions(+), 39 deletions(-) diff --git a/src/hottier.rs b/src/hottier.rs index 91e9ec361..375a787d3 100644 --- a/src/hottier.rs +++ b/src/hottier.rs @@ -199,7 +199,8 @@ impl HotTierManager { .await?; let mut stream_hot_tier: StreamHotTier = serde_json::from_slice(&bytes)?; - stream_hot_tier.oldest_date_time_entry = self.get_oldest_date_time_entry(stream).await?; + stream_hot_tier.oldest_date_time_entry = + self.get_oldest_date_time_entry(stream, tenant_id).await?; Ok(stream_hot_tier) } @@ -450,7 +451,7 @@ impl HotTierManager { self.put_hot_tier(stream, &mut stream_hot_tier, tenant_id) .await?; file_processed = true; - let path = self.get_stream_path_for_date(stream, &date); + let path = self.get_stream_path_for_date(stream, &date, tenant_id); let mut hot_tier_manifest = HotTierManager::get_hot_tier_manifest_from_path(path).await?; hot_tier_manifest.files.push(parquet_file.clone()); hot_tier_manifest @@ -458,7 +459,7 @@ impl HotTierManager { .sort_by_key(|file| file.file_path.clone()); // write the manifest file to the hot tier directory let manifest_path = self - .get_stream_path_for_date(stream, &date) + .get_stream_path_for_date(stream, &date, tenant_id) .join("hottier.manifest.json"); fs::create_dir_all(manifest_path.parent().unwrap()).await?; fs::write(manifest_path, serde_json::to_vec(&hot_tier_manifest)?).await?; @@ -467,9 +468,18 @@ impl HotTierManager { } ///fetch the list of dates available in the hot tier directory for the stream and sort them - pub async fn fetch_hot_tier_dates(&self, stream: &str) -> Result, HotTierError> { + pub async fn fetch_hot_tier_dates( + &self, + stream: &str, + tenant_id: &Option, + ) -> Result, HotTierError> { let mut date_list = Vec::new(); - let path = self.hot_tier_path.join(stream); + let path = if let Some(tenant) = tenant_id.as_ref() { + self.hot_tier_path.join(tenant).join(stream) + } else { + self.hot_tier_path.join(stream) + }; + // let path = self.hot_tier_path.join(stream); if !path.exists() { return Ok(date_list); } @@ -524,37 +534,46 @@ impl HotTierManager { } /// get hot tier path for the stream and date - pub fn get_stream_path_for_date(&self, stream: &str, date: &NaiveDate) -> PathBuf { - self.hot_tier_path.join(stream).join(format!("date={date}")) + pub fn get_stream_path_for_date( + &self, + stream: &str, + date: &NaiveDate, + tenant_id: &Option, + ) -> PathBuf { + if let Some(tenant) = tenant_id.as_ref() { + self.hot_tier_path + .join(tenant) + .join(stream) + .join(format!("date={date}")) + } else { + self.hot_tier_path.join(stream).join(format!("date={date}")) + } } /// Returns the list of manifest files present in hot tier directory for the stream pub async fn get_hot_tier_manifest_files( &self, - stream: &str, manifest_files: &mut Vec, ) -> Result, HotTierError> { - // Fetch the list of hot tier parquet files for the given stream. - let mut hot_tier_files = self.get_hot_tier_parquet_files(stream).await?; - - // Retain only the files in `hot_tier_files` that also exist in `manifest_files`. - hot_tier_files.retain(|file| { - manifest_files - .iter() - .any(|manifest_file| manifest_file.file_path.eq(&file.file_path)) + // Instead of reading all hot tier manifests from disk (expensive I/O on every query), + // check which query-relevant files exist locally in the hot tier directory. + let mut hot_tier_files = Vec::new(); + + manifest_files.retain(|file| { + let hot_tier_path = self.hot_tier_path.join(&file.file_path); + if hot_tier_path.exists() + && let Ok(meta) = std::fs::metadata(&hot_tier_path) + && meta.len() == file.file_size + { + hot_tier_files.push(file.clone()); + false + } else { + true + } }); - // Sort `hot_tier_files` in descending order by file path. + // Sort both lists in descending order by file path. hot_tier_files.sort_unstable_by(|a, b| b.file_path.cmp(&a.file_path)); - - // Update `manifest_files` to exclude files that are present in the filtered `hot_tier_files`. - manifest_files.retain(|manifest_file| { - hot_tier_files - .iter() - .all(|file| !file.file_path.eq(&manifest_file.file_path)) - }); - - // Sort `manifest_files` in descending order by file path. manifest_files.sort_unstable_by(|a, b| b.file_path.cmp(&a.file_path)); Ok(hot_tier_files) @@ -564,16 +583,17 @@ impl HotTierManager { pub async fn get_hot_tier_parquet_files( &self, stream: &str, + tenant_id: &Option, ) -> Result, HotTierError> { // Fetch list of dates for the given stream - let date_list = self.fetch_hot_tier_dates(stream).await?; + let date_list = self.fetch_hot_tier_dates(stream, tenant_id).await?; // Create an unordered iter of futures to async collect files let mut tasks = FuturesUnordered::new(); // For each date, fetch the manifest and extract parquet files for date in date_list { - let path = self.get_stream_path_for_date(stream, &date); + let path = self.get_stream_path_for_date(stream, &date, tenant_id); tasks.push(async move { HotTierManager::get_hot_tier_manifest_from_path(path) .await @@ -621,9 +641,9 @@ impl HotTierManager { tenant_id: &Option, ) -> Result { let mut delete_successful = false; - let dates = self.fetch_hot_tier_dates(stream).await?; + let dates = self.fetch_hot_tier_dates(stream, tenant_id).await?; 'loop_dates: for date in dates { - let path = self.get_stream_path_for_date(stream, &date); + let path = self.get_stream_path_for_date(stream, &date, tenant_id); if !path.exists() { continue; } @@ -712,14 +732,15 @@ impl HotTierManager { pub async fn get_oldest_date_time_entry( &self, stream: &str, + tenant_id: &Option, ) -> Result, HotTierError> { - let date_list = self.fetch_hot_tier_dates(stream).await?; + let date_list = self.fetch_hot_tier_dates(stream, tenant_id).await?; if date_list.is_empty() { return Ok(None); } for date in date_list { - let path = self.get_stream_path_for_date(stream, &date); + let path = self.get_stream_path_for_date(stream, &date, tenant_id); let hours_dir = ReadDirStream::new(fs::read_dir(&path).await?); let mut hours: Vec = hours_dir.try_collect().await?; hours.retain(|entry| { diff --git a/src/query/stream_schema_provider.rs b/src/query/stream_schema_provider.rs index ccf31764b..387a6e3d5 100644 --- a/src/query/stream_schema_provider.rs +++ b/src/query/stream_schema_provider.rs @@ -202,11 +202,11 @@ impl StandardTableProvider { time_partition: Option, ) -> Result<(), DataFusionError> { let hot_tier_files = hot_tier_manager - .get_hot_tier_manifest_files(&self.stream, manifest_files) + .get_hot_tier_manifest_files(manifest_files) .await .map_err(|err| DataFusionError::External(Box::new(err)))?; - let hot_tier_files = hot_tier_files + let hot_tier_files: Vec = hot_tier_files .into_iter() .map(|mut file| { let path = PARSEABLE @@ -221,11 +221,7 @@ impl StandardTableProvider { .collect(); let (partitioned_files, statistics) = self.partitioned_files(hot_tier_files); - // let object_store_url = if let Some(tenant_id) = self.tenant_id.as_ref() { - // &format!("file:///{tenant_id}/") - // } else { - // "file:///" - // }; + let object_store_url = "file:///"; self.create_parquet_physical_plan( execution_plans, From a5083e597416f2e67cf7ba7bb8e668027e6782af Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Tue, 3 Mar 2026 00:31:47 +1100 Subject: [PATCH 2/2] coderabbit fix --- src/hottier.rs | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/src/hottier.rs b/src/hottier.rs index 375a787d3..d4641dd1f 100644 --- a/src/hottier.rs +++ b/src/hottier.rs @@ -555,22 +555,23 @@ impl HotTierManager { &self, manifest_files: &mut Vec, ) -> Result, HotTierError> { - // Instead of reading all hot tier manifests from disk (expensive I/O on every query), - // check which query-relevant files exist locally in the hot tier directory. + // Check which query-relevant files exist locally in the hot tier directory. let mut hot_tier_files = Vec::new(); + let mut remaining = Vec::with_capacity(manifest_files.len()); - manifest_files.retain(|file| { + for file in manifest_files.drain(..) { let hot_tier_path = self.hot_tier_path.join(&file.file_path); - if hot_tier_path.exists() - && let Ok(meta) = std::fs::metadata(&hot_tier_path) + if let Ok(meta) = fs::metadata(&hot_tier_path).await && meta.len() == file.file_size { - hot_tier_files.push(file.clone()); - false - } else { - true + hot_tier_files.push(file); + continue; } - }); + + remaining.push(file); + } + + *manifest_files = remaining; // Sort both lists in descending order by file path. hot_tier_files.sort_unstable_by(|a, b| b.file_path.cmp(&a.file_path));