From 983679cab66fc16c8d7bea101f8ec930e577b7d3 Mon Sep 17 00:00:00 2001 From: Matthew Kim Date: Mon, 30 Mar 2026 15:08:39 -0400 Subject: [PATCH 1/7] Add pagination and filtering fields to ListMetricsSplitsQuery --- .../file_backed/file_backed_index/mod.rs | 43 +++++++++++++++++-- .../quickwit-metastore/src/metastore/mod.rs | 31 +++++++++++++ .../src/metastore/postgres/metastore.rs | 32 +++++++++++++- 3 files changed, 102 insertions(+), 4 deletions(-) diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs index 6fd5ce244be..f9088c8d675 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs @@ -854,11 +854,34 @@ impl FileBackedIndex { &self, query: &ListMetricsSplitsQuery, ) -> Vec { - self.metrics_splits + let mut splits: Vec<&StoredMetricsSplit> = self + .metrics_splits .values() .filter(|split| metrics_split_matches_query(split, query)) - .cloned() - .collect() + .collect(); + + // Sort by split_id for stable pagination (mirrors Postgres ORDER BY split_id ASC). + splits.sort_unstable_by(|a, b| { + a.metadata.split_id.as_str().cmp(b.metadata.split_id.as_str()) + }); + + // Apply cursor: skip splits up to and including after_split_id. + let splits = if let Some(ref after) = query.after_split_id { + let pos = + splits.partition_point(|s| s.metadata.split_id.as_str() <= after.as_str()); + &splits[pos..] + } else { + &splits[..] + }; + + // Apply limit. + let splits = if let Some(limit) = query.limit { + &splits[..splits.len().min(limit)] + } else { + splits + }; + + splits.iter().map(|s| (*s).clone()).collect() } /// Marks metrics splits for deletion. @@ -979,6 +1002,20 @@ fn metrics_split_matches_query(split: &StoredMetricsSplit, query: &ListMetricsSp } } + // Filter by update timestamp + if let Some(update_timestamp_lte) = query.update_timestamp_lte { + if split.update_timestamp > update_timestamp_lte { + return false; + } + } + + // Filter by max time_range_end (retention policy: splits whose data ends before cutoff) + if let Some(max_time_range_end) = query.max_time_range_end { + if (split.metadata.time_range.end_secs as i64) > max_time_range_end { + return false; + } + } + true } diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index ddfee25afab..beb54e70bfd 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -77,6 +77,13 @@ pub struct ListMetricsSplitsQuery { pub tag_host: Option, /// Limit number of results. pub limit: Option, + /// Only return splits whose split_id are lexicographically after this split + pub after_split_id: Option, + /// Filter splits with update_timestamp <= this value (epoch seconds). + pub update_timestamp_lte: Option, + /// Filter splits whose `time_range_end` (exclusive upper bound) <= this + /// value. + pub max_time_range_end: Option, } impl ListMetricsSplitsQuery { @@ -107,6 +114,30 @@ impl ListMetricsSplitsQuery { self.metric_names = names; self } + + /// Filter splits updated at or before the given timestamp (epoch seconds). + pub fn with_update_timestamp_lte(mut self, timestamp: i64) -> Self { + self.update_timestamp_lte = Some(timestamp); + self + } + + /// Limit number of results returned. + pub fn with_limit(mut self, limit: usize) -> Self { + self.limit = Some(limit); + self + } + + /// Set the pagination cursor: return only splits with split_id > `split_id`. + pub fn with_after_split_id(mut self, split_id: impl Into) -> Self { + self.after_split_id = Some(split_id.into()); + self + } + + /// Filter splits whose `time_range_end` (exclusive) <= the given timestamp. + pub fn with_max_time_range_end(mut self, timestamp: i64) -> Self { + self.max_time_range_end = Some(timestamp); + self + } } /// Splits batch size returned by the stream splits API diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index d459ceb243b..a6f93ce96b9 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -2221,7 +2221,28 @@ impl MetastoreService for PostgresqlMetastore { param_idx += 1; } - sql.push_str(" ORDER BY time_range_start ASC"); + // Add update_timestamp filter for GC + if query.update_timestamp_lte.is_some() { + sql.push_str(&format!( + " AND update_timestamp <= TO_TIMESTAMP(${})", + param_idx + )); + param_idx += 1; + } + + // Add max_time_range_end filter for retention policy + if query.max_time_range_end.is_some() { + sql.push_str(&format!(" AND time_range_end <= ${}", param_idx)); + param_idx += 1; + } + + // Add pagination cursor + if query.after_split_id.is_some() { + sql.push_str(&format!(" AND split_id > ${}", param_idx)); + param_idx += 1; + } + + sql.push_str(" ORDER BY split_id ASC"); // Add limit if query.limit.is_some() { @@ -2280,6 +2301,15 @@ impl MetastoreService for PostgresqlMetastore { if let Some(ref host) = query.tag_host { query_builder = query_builder.bind(host); } + if let Some(update_timestamp_lte) = query.update_timestamp_lte { + query_builder = query_builder.bind(update_timestamp_lte); + } + if let Some(max_time_range_end) = query.max_time_range_end { + query_builder = query_builder.bind(max_time_range_end); + } + if let Some(ref after_split_id) = query.after_split_id { + query_builder = query_builder.bind(after_split_id); + } if let Some(limit) = query.limit { query_builder = query_builder.bind(limit as i64); } From 4c559d3c3c4404ccd3e74f7860130682d7e260d8 Mon Sep 17 00:00:00 2001 From: Matthew Kim Date: Mon, 30 Mar 2026 15:20:34 -0400 Subject: [PATCH 2/7] Extract shared storage deletion logic into reusable delete_split_files helper --- .../src/garbage_collection.rs | 109 +++++++++++------- 1 file changed, 69 insertions(+), 40 deletions(-) diff --git a/quickwit/quickwit-index-management/src/garbage_collection.rs b/quickwit/quickwit-index-management/src/garbage_collection.rs index 2ca485026ee..3696181b6ba 100644 --- a/quickwit/quickwit-index-management/src/garbage_collection.rs +++ b/quickwit/quickwit-index-management/src/garbage_collection.rs @@ -46,7 +46,7 @@ pub struct GcMetrics { pub failed_splits: IntCounter, } -trait RecordGcMetrics { +pub(crate) trait RecordGcMetrics { fn record(&self, num_delete_splits: usize, num_deleted_bytes: u64, num_failed_splits: usize); } @@ -72,7 +72,7 @@ pub struct DeleteSplitsError { metastore_failures: Vec, } -async fn protect_future(progress: Option<&Progress>, future: Fut) -> T +pub(crate) async fn protect_future(progress: Option<&Progress>, future: Fut) -> T where Fut: Future { match progress { None => future.await, @@ -289,7 +289,7 @@ async fn list_splits_metadata( /// In order to avoid hammering the load on the metastore, we can throttle the rate of split /// deletion by setting this environment variable. -fn get_maximum_split_deletion_rate_per_sec() -> Option { +pub(crate) fn get_maximum_split_deletion_rate_per_sec() -> Option { static MAX_SPLIT_DELETION_RATE_PER_SEC: OnceLock> = OnceLock::new(); *MAX_SPLIT_DELETION_RATE_PER_SEC.get_or_init(|| { quickwit_common::get_from_env_opt::("QW_MAX_SPLIT_DELETION_RATE_PER_SEC", false) @@ -408,6 +408,43 @@ async fn delete_splits_marked_for_deletion_several_indexes( split_removal_info } +/// A split normalized for storage deletion: just the id, path, and size. +/// Used as the common currency between tantivy and parquet GC paths. +pub(crate) struct SplitToDelete { + pub split_id: String, + pub path: PathBuf, + pub size_bytes: u64, +} + +/// Deletes split files from storage and partitions into (succeeded, failed). +/// +/// Returns the `BulkDeleteError` if there was a partial failure, so the caller +/// can log it with index-specific context. Does NOT touch the metastore. +pub(crate) async fn delete_split_files( + storage: &dyn Storage, + splits: Vec, + progress_opt: Option<&Progress>, +) -> (Vec, Vec, Option) { + if splits.is_empty() { + return (Vec::new(), Vec::new(), None); + } + let paths: Vec<&Path> = splits.iter().map(|s| s.path.as_path()).collect(); + let result = protect_future(progress_opt, storage.bulk_delete(&paths)).await; + + if let Some(progress) = progress_opt { + progress.record_progress(); + } + match result { + Ok(()) => (splits, Vec::new(), None), + Err(bulk_err) => { + let success_paths: HashSet<&PathBuf> = bulk_err.successes.iter().collect(); + let (succeeded, failed) = + splits.into_iter().partition(|s| success_paths.contains(&s.path)); + (succeeded, failed, Some(bulk_err)) + } + } +} + /// Delete a list of splits from the storage and the metastore. /// It should leave the index and the metastore in good state. /// @@ -424,49 +461,41 @@ pub async fn delete_splits_from_storage_and_metastore( progress_opt: Option<&Progress>, ) -> Result, DeleteSplitsError> { let mut split_infos: HashMap = HashMap::with_capacity(splits.len()); - for split in splits { let split_info = split.as_split_info(); split_infos.insert(split_info.file_name.clone(), split_info); } - let split_paths = split_infos - .keys() - .map(|split_path_buf| split_path_buf.as_path()) - .collect::>(); - let delete_result = protect_future(progress_opt, storage.bulk_delete(&split_paths)).await; - if let Some(progress) = progress_opt { - progress.record_progress(); - } - let mut successes = Vec::with_capacity(split_infos.len()); + let splits_to_delete: Vec = split_infos + .values() + .map(|info| SplitToDelete { + split_id: info.split_id.clone(), + path: info.file_name.clone(), + size_bytes: info.file_size_bytes.as_u64(), + }) + .collect(); + + let (succeeded_stds, failed_stds, storage_err) = + delete_split_files(&*storage, splits_to_delete, progress_opt).await; + + let successes: Vec = succeeded_stds.iter().map(|s| split_infos[&s.path].clone()).collect(); + let storage_failures: Vec = failed_stds.iter().map(|s| split_infos[&s.path].clone()).collect(); + let mut storage_error: Option = None; - let mut storage_failures = Vec::new(); - - match delete_result { - Ok(_) => successes.extend(split_infos.into_values()), - Err(bulk_delete_error) => { - let success_split_paths: HashSet<&PathBuf> = - bulk_delete_error.successes.iter().collect(); - for (split_path, split_info) in split_infos { - if success_split_paths.contains(&split_path) { - successes.push(split_info); - } else { - storage_failures.push(split_info); - } - } - let failed_split_paths = storage_failures - .iter() - .map(|split_info| split_info.file_name.as_path()) - .collect::>(); - error!( - error=?bulk_delete_error.error, - index_id=index_uid.index_id, - "failed to delete split file(s) {:?} from storage", - PrettySample::new(&failed_split_paths, 5), - ); - storage_error = Some(bulk_delete_error); - } - }; + if let Some(bulk_delete_error) = storage_err { + let failed_split_paths = storage_failures + .iter() + .map(|split_info| split_info.file_name.as_path()) + .collect::>(); + error!( + error=?bulk_delete_error.error, + index_id=index_uid.index_id, + "failed to delete split file(s) {:?} from storage", + PrettySample::new(&failed_split_paths, 5), + ); + storage_error = Some(bulk_delete_error); + } + if !successes.is_empty() { let split_ids: Vec = successes .iter() From 63f87435cac118e9b561d8fab18f85f62b184362 Mon Sep 17 00:00:00 2001 From: Matthew Kim Date: Mon, 30 Mar 2026 16:03:31 -0400 Subject: [PATCH 3/7] Add parquet garbage collection core with two-phase mark-and-delete --- quickwit/Cargo.lock | 2 + quickwit/quickwit-index-management/Cargo.toml | 1 + quickwit/quickwit-index-management/src/lib.rs | 4 + .../src/parquet_garbage_collection.rs | 648 ++++++++++++++++++ 4 files changed, 655 insertions(+) create mode 100644 quickwit/quickwit-index-management/src/parquet_garbage_collection.rs diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index d099964a383..96ccdeeafff 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -7264,6 +7264,7 @@ dependencies = [ "quickwit-config", "quickwit-indexing", "quickwit-metastore", + "quickwit-parquet-engine", "quickwit-proto", "quickwit-storage", "thiserror 2.0.18", @@ -7461,6 +7462,7 @@ dependencies = [ "quickwit-index-management", "quickwit-indexing", "quickwit-metastore", + "quickwit-parquet-engine", "quickwit-proto", "quickwit-query", "quickwit-search", diff --git a/quickwit/quickwit-index-management/Cargo.toml b/quickwit/quickwit-index-management/Cargo.toml index 2db7d062f12..d303125f65f 100644 --- a/quickwit/quickwit-index-management/Cargo.toml +++ b/quickwit/quickwit-index-management/Cargo.toml @@ -24,6 +24,7 @@ quickwit-common = { workspace = true } quickwit-config = { workspace = true } quickwit-indexing = { workspace = true } quickwit-metastore = { workspace = true } +quickwit-parquet-engine = { workspace = true } quickwit-proto = { workspace = true } quickwit-storage = { workspace = true } diff --git a/quickwit/quickwit-index-management/src/lib.rs b/quickwit/quickwit-index-management/src/lib.rs index 9bc004d4f01..cd35f215825 100644 --- a/quickwit/quickwit-index-management/src/lib.rs +++ b/quickwit/quickwit-index-management/src/lib.rs @@ -14,6 +14,10 @@ mod garbage_collection; mod index; +mod parquet_garbage_collection; pub use garbage_collection::{GcMetrics, run_garbage_collect}; pub use index::{IndexService, IndexServiceError, clear_cache_directory, validate_storage_uri}; +pub use parquet_garbage_collection::{ + ParquetSplitInfo, ParquetSplitRemovalInfo, run_parquet_garbage_collect, +}; diff --git a/quickwit/quickwit-index-management/src/parquet_garbage_collection.rs b/quickwit/quickwit-index-management/src/parquet_garbage_collection.rs new file mode 100644 index 00000000000..656a5edb656 --- /dev/null +++ b/quickwit/quickwit-index-management/src/parquet_garbage_collection.rs @@ -0,0 +1,648 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Context; +use quickwit_common::Progress; +use quickwit_metastore::{ + ListMetricsSplitsQuery, ListMetricsSplitsRequestExt, ListMetricsSplitsResponseExt, +}; +use quickwit_parquet_engine::split::{MetricsSplitRecord, MetricsSplitState}; +use quickwit_proto::metastore::{ + DeleteMetricsSplitsRequest, ListMetricsSplitsRequest, MarkMetricsSplitsForDeletionRequest, + MetastoreService, MetastoreServiceClient, +}; +use quickwit_proto::types::IndexUid; +use quickwit_storage::Storage; +use time::OffsetDateTime; +use tracing::{error, info, instrument, warn}; + +use crate::garbage_collection::{ + GcMetrics, RecordGcMetrics, SplitToDelete, delete_split_files, + get_maximum_split_deletion_rate_per_sec, protect_future, +}; + +/// Detail about a single parquet split affected by GC. +#[derive(Debug)] +pub struct ParquetSplitInfo { + pub split_id: String, + pub file_size_bytes: u64, +} + +/// Information on what parquet splits have and have not been cleaned up by the GC. +#[derive(Debug, Default)] +pub struct ParquetSplitRemovalInfo { + pub removed_parquet_splits_entries: Vec, + pub failed_parquet_splits: Vec, +} + +impl ParquetSplitRemovalInfo { + pub fn removed_split_count(&self) -> usize { + self.removed_parquet_splits_entries.len() + } + + pub fn removed_bytes(&self) -> u64 { + self.removed_parquet_splits_entries + .iter() + .map(|s| s.file_size_bytes) + .sum() + } + + pub fn failed_split_count(&self) -> usize { + self.failed_parquet_splits.len() + } +} + +/// Maximum number of parquet splits to process per paginated query. +const DELETE_PARQUET_SPLITS_BATCH_SIZE: usize = 10_000; + +/// Runs garbage collection for parquet splits. +#[instrument(skip_all, fields(num_indexes=%indexes.len()))] +pub async fn run_parquet_garbage_collect( + indexes: HashMap>, + metastore: MetastoreServiceClient, + staged_grace_period: Duration, + deletion_grace_period: Duration, + dry_run: bool, + progress_opt: Option<&Progress>, + metrics: Option, +) -> anyhow::Result { + let mut removal_info = ParquetSplitRemovalInfo::default(); + + // Phase 1: List stale Staged splits + let staged_cutoff = + OffsetDateTime::now_utc().unix_timestamp() - staged_grace_period.as_secs() as i64; + + let mut deletable_staged_splits = Vec::new(); + for index_uid in indexes.keys() { + match list_stale_staged_splits(&metastore, index_uid, staged_cutoff, progress_opt).await { + Ok(splits) => deletable_staged_splits.extend(splits), + Err(err) => { + error!(index_uid=%index_uid, error=?err, "failed to list stale staged parquet splits"); + } + } + } + + if dry_run { + let deletion_cutoff = + OffsetDateTime::now_utc().unix_timestamp() - deletion_grace_period.as_secs() as i64; + + let mut splits_marked_for_deletion = Vec::new(); + for index_uid in indexes.keys() { + match list_marked_splits(&metastore, index_uid, deletion_cutoff, progress_opt).await { + Ok(splits) => splits_marked_for_deletion.extend(splits), + Err(err) => { + error!(index_uid=%index_uid, error=?err, "failed to list marked parquet splits"); + } + } + } + splits_marked_for_deletion.extend(deletable_staged_splits); + + let candidate_entries: Vec = splits_marked_for_deletion + .into_iter() + .map(|s| ParquetSplitInfo { + split_id: s.metadata.split_id.to_string(), + file_size_bytes: s.metadata.size_bytes, + }) + .collect(); + return Ok(ParquetSplitRemovalInfo { + removed_parquet_splits_entries: candidate_entries, + failed_parquet_splits: Vec::new(), + }); + } + + // Schedule all eligible staged splits for delete + if let Err(err) = + mark_splits_for_deletion(&metastore, &deletable_staged_splits, progress_opt).await + { + error!(error=?err, "failed to mark stale staged parquet splits"); + } + + // Phase 2: Delete splits marked for deletion past the grace period + let deletion_cutoff = + OffsetDateTime::now_utc().unix_timestamp() - deletion_grace_period.as_secs() as i64; + + for (index_uid, storage) in &indexes { + match delete_marked_parquet_splits( + &metastore, + index_uid, + storage.clone(), + deletion_cutoff, + progress_opt, + ) + .await + { + Ok(batch_info) => { + removal_info + .removed_parquet_splits_entries + .extend(batch_info.removed_parquet_splits_entries); + removal_info + .failed_parquet_splits + .extend(batch_info.failed_parquet_splits); + } + Err(err) => { + error!(index_uid=%index_uid, error=?err, "failed to delete marked parquet splits"); + } + } + } + + metrics.record( + removal_info.removed_split_count(), + removal_info.removed_bytes(), + removal_info.failed_split_count(), + ); + + Ok(removal_info) +} + +/// Lists Staged parquet splits older than the cutoff. +async fn list_stale_staged_splits( + metastore: &MetastoreServiceClient, + index_uid: &IndexUid, + staged_cutoff: i64, + progress_opt: Option<&Progress>, +) -> anyhow::Result> { + let query = ListMetricsSplitsQuery::for_index(index_uid.clone()) + .with_split_states(vec![MetricsSplitState::Staged.to_string()]) + .with_update_timestamp_lte(staged_cutoff); + + let request = ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query) + .context("failed to build list metrics splits request")?; + + let response = protect_future(progress_opt, metastore.list_metrics_splits(request)).await?; + response + .deserialize_splits() + .context("failed to deserialize metrics splits") +} + +/// Lists MarkedForDeletion parquet splits older than the cutoff. +async fn list_marked_splits( + metastore: &MetastoreServiceClient, + index_uid: &IndexUid, + deletion_cutoff: i64, + progress_opt: Option<&Progress>, +) -> anyhow::Result> { + let query = ListMetricsSplitsQuery::for_index(index_uid.clone()) + .with_split_states(vec![MetricsSplitState::MarkedForDeletion.to_string()]) + .with_update_timestamp_lte(deletion_cutoff); + + let request = ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query) + .context("failed to build list metrics splits request")?; + + let response = protect_future(progress_opt, metastore.list_metrics_splits(request)).await?; + response + .deserialize_splits() + .context("failed to deserialize metrics splits") +} + +/// Marks the given splits for deletion in the metastore, grouped by index. +async fn mark_splits_for_deletion( + metastore: &MetastoreServiceClient, + splits: &[MetricsSplitRecord], + progress_opt: Option<&Progress>, +) -> anyhow::Result<()> { + if splits.is_empty() { + return Ok(()); + } + + // Group split IDs by index_uid string, then resolve to IndexUid for the request. + let mut splits_by_index: HashMap> = HashMap::new(); + for split in splits { + splits_by_index + .entry(split.metadata.index_uid.clone()) + .or_default() + .push(split.metadata.split_id.to_string()); + } + + for (index_uid_str, split_ids) in splits_by_index { + let index_uid: IndexUid = index_uid_str.parse()?; + info!(index_uid=%index_uid, count=%split_ids.len(), "marking stale staged parquet splits for deletion"); + + protect_future( + progress_opt, + metastore.mark_metrics_splits_for_deletion(MarkMetricsSplitsForDeletionRequest { + index_uid: Some(index_uid), + split_ids, + }), + ) + .await?; + } + + Ok(()) +} + +/// Phase 2: Find MarkedForDeletion parquet splits older than the cutoff, +/// delete their storage files, then delete the metastore entries. +async fn delete_marked_parquet_splits( + metastore: &MetastoreServiceClient, + index_uid: &IndexUid, + storage: Arc, + deletion_cutoff: i64, + progress_opt: Option<&Progress>, +) -> anyhow::Result { + let mut removal_info = ParquetSplitRemovalInfo::default(); + + let mut query = ListMetricsSplitsQuery::for_index(index_uid.clone()) + .with_split_states(vec![MetricsSplitState::MarkedForDeletion.to_string()]) + .with_update_timestamp_lte(deletion_cutoff) + .with_limit(DELETE_PARQUET_SPLITS_BATCH_SIZE); + + loop { + let sleep_duration = if let Some(max_rate) = get_maximum_split_deletion_rate_per_sec() { + Duration::from_secs(DELETE_PARQUET_SPLITS_BATCH_SIZE.div_ceil(max_rate) as u64) + } else { + Duration::default() + }; + let sleep_future = tokio::time::sleep(sleep_duration); + + let request = match ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query) { + Ok(req) => req, + Err(err) => { + error!(index_uid=%index_uid, error=?err, "failed to build list metrics splits request"); + break; + } + }; + + let splits: Vec = match protect_future( + progress_opt, + metastore.list_metrics_splits(request), + ) + .await + { + Ok(resp) => match resp.deserialize_splits() { + Ok(splits) => splits, + Err(err) => { + error!(index_uid=%index_uid, error=?err, "failed to deserialize metrics splits"); + break; + } + }, + Err(err) => { + error!(index_uid=%index_uid, error=?err, "failed to list metrics splits"); + break; + } + }; + + // We page through the list of splits to delete using a limit and a `search_after` trick. + // To detect if this is the last page, we check if the number of splits is less than the + // limit. + assert!(splits.len() <= DELETE_PARQUET_SPLITS_BATCH_SIZE); + let splits_to_delete_possibly_remaining = + splits.len() == DELETE_PARQUET_SPLITS_BATCH_SIZE; + + // Set split after which to search for the next loop. + let Some(last_split) = splits.last() else { + break; + }; + query = query.with_after_split_id(last_split.metadata.split_id.to_string()); + + let (batch_succeeded, batch_failed) = delete_parquet_splits_from_storage_and_metastore( + metastore, + index_uid, + storage.as_ref(), + &splits, + progress_opt, + ) + .await; + removal_info + .removed_parquet_splits_entries + .extend(batch_succeeded); + removal_info.failed_parquet_splits.extend(batch_failed); + + if splits_to_delete_possibly_remaining { + sleep_future.await; + } else { + // Stop the GC if this was the last batch. + // We are guaranteed to make progress due to .with_after_split_id(). + break; + } + } + + Ok(removal_info) +} + +/// Deletes a single batch of parquet splits from storage and metastore. +/// Returns (succeeded, failed). +async fn delete_parquet_splits_from_storage_and_metastore( + metastore: &MetastoreServiceClient, + index_uid: &IndexUid, + storage: &dyn Storage, + splits: &[MetricsSplitRecord], + progress_opt: Option<&Progress>, +) -> (Vec, Vec) { + let splits_to_delete: Vec = splits + .iter() + .map(|s| SplitToDelete { + split_id: s.metadata.split_id.to_string(), + path: PathBuf::from(s.metadata.parquet_filename()), + size_bytes: s.metadata.size_bytes, + }) + .collect(); + + let (succeeded_stds, failed_stds, storage_err) = + delete_split_files(storage, splits_to_delete, progress_opt).await; + + if let Some(bulk_err) = storage_err { + warn!( + index_id=%index_uid, + num_failed=%failed_stds.len(), + num_succeeded=%succeeded_stds.len(), + error=?bulk_err, + "partial failure deleting parquet files from storage" + ); + } + + let storage_failed: Vec = failed_stds + .into_iter() + .map(|s| ParquetSplitInfo { split_id: s.split_id, file_size_bytes: s.size_bytes }) + .collect(); + + if succeeded_stds.is_empty() { + return (Vec::new(), storage_failed); + } + + let batch_len = succeeded_stds.len(); + let ids_to_delete: Vec = succeeded_stds.iter().map(|s| s.split_id.clone()).collect(); + let delete_request = DeleteMetricsSplitsRequest { + index_uid: Some(index_uid.clone()), + split_ids: ids_to_delete, + }; + let metastore_result = + protect_future(progress_opt, metastore.delete_metrics_splits(delete_request)).await; + + if let Some(progress) = progress_opt { + progress.record_progress(); + } + + let succeeded: Vec = succeeded_stds + .into_iter() + .map(|s| ParquetSplitInfo { split_id: s.split_id, file_size_bytes: s.size_bytes }) + .collect(); + + match metastore_result { + Ok(_) => { + let bytes_deleted: u64 = succeeded.iter().map(|s| s.file_size_bytes).sum(); + info!(index_uid=%index_uid, count=%batch_len, bytes=%bytes_deleted, "deleted parquet splits"); + (succeeded, storage_failed) + } + Err(err) => { + error!(index_uid=%index_uid, count=%batch_len, error=?err, "failed to delete parquet splits from metastore"); + let mut failed = storage_failed; + failed.extend(succeeded); + (Vec::new(), failed) + } + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::path::PathBuf; + use std::sync::Arc; + use std::time::Duration; + + use quickwit_metastore::ListMetricsSplitsResponseExt; + use quickwit_parquet_engine::split::{ + MetricsSplitMetadata, MetricsSplitRecord, MetricsSplitState, SplitId, TimeRange, + }; + use quickwit_proto::metastore::{ + EmptyResponse, ListMetricsSplitsResponse, MetastoreServiceClient, MockMetastoreService, + }; + use quickwit_storage::{DeleteFailure, MockStorage}; + + use super::*; + + const TEST_INDEX: &str = "otel-metrics-v0_9"; + + fn test_index_uid() -> IndexUid { + IndexUid::for_test(TEST_INDEX, 0) + } + + fn make_split(split_id: &str, state: MetricsSplitState) -> MetricsSplitRecord { + MetricsSplitRecord { + state, + update_timestamp: 0, + metadata: MetricsSplitMetadata::builder() + .split_id(SplitId::new(split_id)) + .index_uid(test_index_uid().to_string()) + .time_range(TimeRange::new(1000, 2000)) + .num_rows(100) + .size_bytes(1024) + .build(), + } + } + + fn list_response(splits: &[MetricsSplitRecord]) -> ListMetricsSplitsResponse { + ListMetricsSplitsResponse::try_from_splits(splits).unwrap() + } + + fn test_indexes(storage: Arc) -> HashMap> { + HashMap::from([(test_index_uid(), storage)]) + } + + #[tokio::test] + async fn test_parquet_gc_marks_stale_staged_splits() { + let mut mock = MockMetastoreService::new(); + + let staged = vec![ + make_split("staged-1", MetricsSplitState::Staged), + make_split("staged-2", MetricsSplitState::Staged), + ]; + let resp = list_response(&staged); + mock.expect_list_metrics_splits() + .times(1) + .returning(move |_| Ok(resp.clone())); + mock.expect_mark_metrics_splits_for_deletion() + .times(1) + .returning(|req| { + assert_eq!(req.index_uid().index_id, TEST_INDEX); + assert_eq!(req.split_ids.len(), 2); + Ok(EmptyResponse {}) + }); + mock.expect_list_metrics_splits() + .times(1) + .returning(|_| Ok(ListMetricsSplitsResponse::empty())); + + let result = run_parquet_garbage_collect( + test_indexes(Arc::new(MockStorage::new())), + MetastoreServiceClient::from_mock(mock), + Duration::from_secs(0), + Duration::from_secs(30), + false, + None, + None, + ) + .await + .unwrap(); + + assert_eq!(result.removed_split_count(), 0); + assert_eq!(result.failed_split_count(), 0); + } + + #[tokio::test] + async fn test_parquet_gc_deletes_marked_splits() { + let mut mock = MockMetastoreService::new(); + + mock.expect_list_metrics_splits() + .times(1) + .returning(|_| Ok(ListMetricsSplitsResponse::empty())); + + let marked = vec![ + make_split("marked-1", MetricsSplitState::MarkedForDeletion), + make_split("marked-2", MetricsSplitState::MarkedForDeletion), + ]; + let resp = list_response(&marked); + mock.expect_list_metrics_splits() + .times(1) + .returning(move |_| Ok(resp.clone())); + mock.expect_delete_metrics_splits() + .times(1) + .returning(|req| { + assert_eq!(req.index_uid().index_id, TEST_INDEX); + assert_eq!(req.split_ids.len(), 2); + Ok(EmptyResponse {}) + }); + + let mut storage = MockStorage::new(); + storage.expect_bulk_delete().times(1).returning(|paths| { + assert_eq!(paths.len(), 2); + Ok(()) + }); + + let result = run_parquet_garbage_collect( + test_indexes(Arc::new(storage)), + MetastoreServiceClient::from_mock(mock), + Duration::from_secs(30), + Duration::from_secs(0), + false, + None, + None, + ) + .await + .unwrap(); + + assert_eq!(result.removed_split_count(), 2); + assert_eq!(result.removed_bytes(), 2048); + assert_eq!(result.failed_split_count(), 0); + } + + #[tokio::test] + async fn test_parquet_gc_handles_partial_storage_failure() { + let mut mock = MockMetastoreService::new(); + + mock.expect_list_metrics_splits() + .times(1) + .returning(|_| Ok(ListMetricsSplitsResponse::empty())); + + let marked = vec![ + make_split("ok-split", MetricsSplitState::MarkedForDeletion), + make_split("fail-split", MetricsSplitState::MarkedForDeletion), + ]; + let resp = list_response(&marked); + mock.expect_list_metrics_splits() + .times(1) + .returning(move |_| Ok(resp.clone())); + // Only the successful split should be deleted from metastore + mock.expect_delete_metrics_splits() + .times(1) + .returning(|req| { + assert_eq!(req.split_ids.len(), 1); + assert_eq!(req.split_ids[0], "ok-split"); + Ok(EmptyResponse {}) + }); + + let mut storage = MockStorage::new(); + storage.expect_bulk_delete().times(1).returning(|_paths| { + let successes = vec![PathBuf::from("ok-split.parquet")]; + let failures = HashMap::from([( + PathBuf::from("fail-split.parquet"), + DeleteFailure { + code: Some("AccessDenied".to_string()), + ..Default::default() + }, + )]); + Err(quickwit_storage::BulkDeleteError { + successes, + failures, + ..Default::default() + }) + }); + + let result = run_parquet_garbage_collect( + test_indexes(Arc::new(storage)), + MetastoreServiceClient::from_mock(mock), + Duration::from_secs(30), + Duration::from_secs(0), + false, + None, + None, + ) + .await + .unwrap(); + + assert_eq!(result.removed_split_count(), 1); + assert_eq!(result.removed_bytes(), 1024); + assert_eq!(result.failed_split_count(), 1); + } + + #[tokio::test] + async fn test_parquet_gc_dry_run() { + let mut mock = MockMetastoreService::new(); + + // Phase 1: list staged splits + let staged = vec![make_split("staged-1", MetricsSplitState::Staged)]; + let resp = list_response(&staged); + mock.expect_list_metrics_splits() + .times(1) + .returning(move |_| Ok(resp.clone())); + // mark_metrics_splits_for_deletion should NOT be called in dry_run + mock.expect_mark_metrics_splits_for_deletion().times(0); + + // Phase 2: list marked splits + let marked = vec![ + make_split("marked-1", MetricsSplitState::MarkedForDeletion), + make_split("marked-2", MetricsSplitState::MarkedForDeletion), + ]; + let resp = list_response(&marked); + mock.expect_list_metrics_splits() + .times(1) + .returning(move |_| Ok(resp.clone())); + // delete should NOT be called in dry_run + mock.expect_delete_metrics_splits().times(0); + + let storage = MockStorage::new(); + // bulk_delete should NOT be called in dry_run + + let result = run_parquet_garbage_collect( + test_indexes(Arc::new(storage)), + MetastoreServiceClient::from_mock(mock), + Duration::from_secs(0), + Duration::from_secs(0), + true, + None, + None, + ) + .await + .unwrap(); + + // Dry run reports candidates as "removed" (would be removed): + // 1 stale staged + 2 marked for deletion = 3 candidates + assert_eq!(result.removed_split_count(), 3); + assert_eq!(result.removed_bytes(), 3072); + assert_eq!(result.failed_split_count(), 0); + } +} From aef848feab69fffae1e7d358a3300665612dba89 Mon Sep 17 00:00:00 2001 From: Matthew Kim Date: Mon, 30 Mar 2026 16:44:16 -0400 Subject: [PATCH 4/7] Integrate parquet GC and retention policy into janitor actors --- quickwit/quickwit-janitor/Cargo.toml | 1 + .../src/actors/garbage_collector.rs | 284 ++++++++++++++---- .../src/actors/retention_policy_executor.rs | 106 ++++++- quickwit/quickwit-janitor/src/lib.rs | 2 +- quickwit/quickwit-janitor/src/metrics.rs | 22 +- .../src/retention_policy_execution.rs | 54 +++- 6 files changed, 383 insertions(+), 86 deletions(-) diff --git a/quickwit/quickwit-janitor/Cargo.toml b/quickwit/quickwit-janitor/Cargo.toml index 8d4cad3beb9..30b8c3564d5 100644 --- a/quickwit/quickwit-janitor/Cargo.toml +++ b/quickwit/quickwit-janitor/Cargo.toml @@ -32,6 +32,7 @@ quickwit-doc-mapper = { workspace = true } quickwit-index-management = { workspace = true } quickwit-indexing = { workspace = true } quickwit-metastore = { workspace = true } +quickwit-parquet-engine = { workspace = true } quickwit-proto = { workspace = true } quickwit-query = { workspace = true } quickwit-search = { workspace = true } diff --git a/quickwit/quickwit-janitor/src/actors/garbage_collector.rs b/quickwit/quickwit-janitor/src/actors/garbage_collector.rs index 854346155e3..e95febbaacb 100644 --- a/quickwit/quickwit-janitor/src/actors/garbage_collector.rs +++ b/quickwit/quickwit-janitor/src/actors/garbage_collector.rs @@ -20,8 +20,9 @@ use std::time::{Duration, Instant}; use async_trait::async_trait; use futures::{StreamExt, stream}; use quickwit_actors::{Actor, ActorContext, Handler}; +use quickwit_common::is_metrics_index; use quickwit_common::shared_consts::split_deletion_grace_period; -use quickwit_index_management::{GcMetrics, run_garbage_collect}; +use quickwit_index_management::{GcMetrics, run_garbage_collect, run_parquet_garbage_collect}; use quickwit_metastore::ListIndexesMetadataResponseExt; use quickwit_proto::metastore::{ ListIndexesMetadataRequest, MetastoreService, MetastoreServiceClient, @@ -35,6 +36,42 @@ use crate::metrics::JANITOR_METRICS; const RUN_INTERVAL: Duration = Duration::from_secs(10 * 60); // 10 minutes +/// Result of a GC run (tantivy or parquet). +struct GcRunResult { + num_deleted_splits: usize, + num_deleted_bytes: usize, + num_failed: usize, + sample_deleted_files: Vec, +} + +impl GcRunResult { + fn failed() -> Self { + Self { + num_deleted_splits: 0, + num_deleted_bytes: 0, + num_failed: 0, + sample_deleted_files: Vec::new(), + } + } +} + +fn gc_metrics(split_type: &str) -> GcMetrics { + GcMetrics { + deleted_splits: JANITOR_METRICS + .gc_deleted_splits + .with_label_values(["success", split_type]) + .clone(), + deleted_bytes: JANITOR_METRICS + .gc_deleted_bytes + .with_label_values([split_type]) + .clone(), + failed_splits: JANITOR_METRICS + .gc_deleted_splits + .with_label_values(["error", split_type]) + .clone(), + } +} + /// Staged files needs to be deleted if there was a failure. /// TODO ideally we want clean up all staged splits every time we restart the indexing pipeline, but /// the grace period strategy should do the job for the moment. @@ -76,6 +113,37 @@ impl GarbageCollector { counters: GarbageCollectorCounters::default(), } } + // if !deleted_file_entries.is_empty() { + // let num_deleted_splits = deleted_file_entries.len(); + // let num_deleted_bytes = deleted_file_entries + // .iter() + // .map(|entry| entry.file_size_bytes.as_u64() as usize) + // .sum::(); + // let deleted_files: HashSet<&Path> = deleted_file_entries + // .iter() + // .map(|deleted_entry| deleted_entry.file_name.as_path()) + // .take(5) + // .collect(); + // info!( + // num_deleted_splits = num_deleted_splits, + // "Janitor deleted {:?} and {} other splits.", deleted_files, num_deleted_splits, + // ); + // self.counters.num_deleted_files += num_deleted_splits; + // self.counters.num_deleted_bytes += num_deleted_bytes; + + fn record_gc_result(&mut self, result: &GcRunResult, split_type: &str) { + self.counters.num_failed_splits += result.num_failed; + if result.num_deleted_splits > 0 { + info!( + "Janitor deleted {:?} and {} other {} splits.", + result.sample_deleted_files, + result.num_deleted_splits, + split_type, + ); + self.counters.num_deleted_files += result.num_deleted_splits; + self.counters.num_deleted_bytes += result.num_deleted_bytes; + } + } /// Gc Loop handler logic. /// Should not return an error to prevent the actor from crashing. @@ -83,8 +151,6 @@ impl GarbageCollector { debug!("loading indexes from the metastore"); self.counters.num_passes += 1; - let start = Instant::now(); - let response = match self .metastore .list_indexes_metadata(ListIndexesMetadataRequest::all()) @@ -106,7 +172,12 @@ impl GarbageCollector { info!("loaded {} indexes from the metastore", indexes.len()); let expected_count = indexes.len(); - let index_storages: HashMap> = stream::iter(indexes).filter_map(|index| { + + // Resolve storages and split into tantivy vs parquet indexes. + let mut tantivy_storages: HashMap> = HashMap::new(); + let mut parquet_storages: HashMap> = HashMap::new(); + + let resolved: Vec<_> = stream::iter(indexes).filter_map(|index| { let storage_resolver = self.storage_resolver.clone(); async move { let index_uid = index.index_uid.clone(); @@ -114,7 +185,7 @@ impl GarbageCollector { let storage = match storage_resolver.resolve(index_uri).await { Ok(storage) => storage, Err(error) => { - error!(index=%index.index_id(), error=?error, "failed to resolve the index storage Uri"); + error!(index=%index_uid.index_id, error=?error, "failed to resolve the index storage Uri"); return None; } }; @@ -122,68 +193,94 @@ impl GarbageCollector { }}).collect() .await; - let storage_got_count = index_storages.len(); - self.counters.num_failed_storage_resolution += expected_count - storage_got_count; + self.counters.num_failed_storage_resolution += expected_count - resolved.len(); - if index_storages.is_empty() { + for (index_uid, storage) in resolved { + if is_metrics_index(&index_uid.index_id) { + parquet_storages.insert(index_uid, storage); + } else { + tantivy_storages.insert(index_uid, storage); + } + } + + if tantivy_storages.is_empty() && parquet_storages.is_empty() { return; } - let gc_res = run_garbage_collect( - index_storages, - self.metastore.clone(), - STAGED_GRACE_PERIOD, - split_deletion_grace_period(), - false, - Some(ctx.progress()), - Some(GcMetrics { - deleted_splits: JANITOR_METRICS - .gc_deleted_splits - .with_label_values(["success"]) - .clone(), - deleted_bytes: JANITOR_METRICS.gc_deleted_bytes.clone(), - failed_splits: JANITOR_METRICS - .gc_deleted_splits - .with_label_values(["error"]) - .clone(), - }), - ) - .await; + // Run Tantivy GC + if !tantivy_storages.is_empty() { + let tantivy_start = Instant::now(); + let gc_res = run_garbage_collect( + tantivy_storages, + self.metastore.clone(), + STAGED_GRACE_PERIOD, + split_deletion_grace_period(), + false, + Some(ctx.progress()), + Some(gc_metrics("tantivy")), + ) + .await; - let run_duration = start.elapsed().as_secs(); - JANITOR_METRICS.gc_seconds_total.inc_by(run_duration); + let tantivy_run_duration = tantivy_start.elapsed().as_secs(); + JANITOR_METRICS.gc_seconds_total.with_label_values(["tantivy"]).inc_by(tantivy_run_duration); + + let result = match gc_res { + Ok(removal_info) => { + self.counters.num_successful_gc_run += 1; + JANITOR_METRICS.gc_runs.with_label_values(["success", "tantivy"]).inc(); + GcRunResult { + num_deleted_splits: removal_info.removed_split_entries.len(), + num_deleted_bytes: removal_info.removed_split_entries.iter().map(|e| e.file_size_bytes.as_u64() as usize).sum(), + num_failed: removal_info.failed_splits.len(), + sample_deleted_files: removal_info.removed_split_entries.iter().take(5).map(|e| e.file_name.display().to_string()).collect(), + } + } + Err(error) => { + self.counters.num_failed_gc_run += 1; + JANITOR_METRICS.gc_runs.with_label_values(["error", "tantivy"]).inc(); + error!(error=?error, "failed to run garbage collection"); + GcRunResult::failed() + } + }; + self.record_gc_result(&result, "tantivy"); + } - let deleted_file_entries = match gc_res { - Ok(removal_info) => { - self.counters.num_successful_gc_run += 1; - JANITOR_METRICS.gc_runs.with_label_values(["success"]).inc(); - self.counters.num_failed_splits += removal_info.failed_splits.len(); - removal_info.removed_split_entries - } - Err(error) => { - self.counters.num_failed_gc_run += 1; - JANITOR_METRICS.gc_runs.with_label_values(["error"]).inc(); - error!(error=?error, "failed to run garbage collection"); - return; - } - }; - if !deleted_file_entries.is_empty() { - let num_deleted_splits = deleted_file_entries.len(); - let num_deleted_bytes = deleted_file_entries - .iter() - .map(|entry| entry.file_size_bytes.as_u64() as usize) - .sum::(); - let deleted_files: HashSet<&Path> = deleted_file_entries - .iter() - .map(|deleted_entry| deleted_entry.file_name.as_path()) - .take(5) - .collect(); - info!( - num_deleted_splits = num_deleted_splits, - "Janitor deleted {:?} and {} other splits.", deleted_files, num_deleted_splits, - ); - self.counters.num_deleted_files += num_deleted_splits; - self.counters.num_deleted_bytes += num_deleted_bytes; + // Run Parquet GC + if !parquet_storages.is_empty() { + let parquet_start = Instant::now(); + let gc_res = run_parquet_garbage_collect( + parquet_storages, + self.metastore.clone(), + STAGED_GRACE_PERIOD, + split_deletion_grace_period(), + false, + Some(ctx.progress()), + Some(gc_metrics("parquet")), + ) + .await; + + let parquet_run_duration = parquet_start.elapsed().as_secs(); + JANITOR_METRICS.gc_seconds_total.with_label_values(["parquet"]).inc_by(parquet_run_duration); + + let result = match gc_res { + Ok(removal_info) => { + self.counters.num_successful_gc_run += 1; + JANITOR_METRICS.gc_runs.with_label_values(["success", "parquet"]).inc(); + GcRunResult { + num_deleted_splits: removal_info.removed_split_count(), + num_deleted_bytes: removal_info.removed_bytes() as usize, + num_failed: removal_info.failed_split_count(), + sample_deleted_files: removal_info.removed_parquet_splits_entries.iter().take(5).map(|e| format!("{}.parquet", e.split_id)).collect(), + } + } + Err(error) => { + self.counters.num_failed_gc_run += 1; + JANITOR_METRICS.gc_runs.with_label_values(["error", "parquet"]).inc(); + error!(error=?error, "failed to run parquet garbage collection"); + GcRunResult::failed() + } + }; + self.record_gc_result(&result, "parquet"); } } } @@ -756,4 +853,69 @@ mod tests { assert_eq!(counters.num_failed_splits, 2000); universe.assert_quit().await; } + + #[tokio::test] + async fn test_garbage_collect_parquet_index() { + use quickwit_metastore::ListMetricsSplitsResponseExt; + use quickwit_parquet_engine::split::{ + MetricsSplitMetadata, MetricsSplitRecord, MetricsSplitState, SplitId, TimeRange, + }; + use quickwit_proto::metastore::ListMetricsSplitsResponse; + + let storage_resolver = StorageResolver::unconfigured(); + let mut mock = MockMetastoreService::new(); + + mock.expect_list_indexes_metadata() + .times(1) + .returning(|_| { + let indexes = vec![IndexMetadata::for_test( + "otel-metrics-v0_1", + "ram://indexes/otel-metrics-v0_1", + )]; + Ok(ListIndexesMetadataResponse::for_test(indexes)) + }); + + let marked_split = MetricsSplitRecord { + state: MetricsSplitState::MarkedForDeletion, + update_timestamp: 0, + metadata: MetricsSplitMetadata::builder() + .split_id(SplitId::new("metrics_aaa")) + .index_uid("otel-metrics-v0_1:00000000000000000000000000") + .time_range(TimeRange::new(1000, 2000)) + .num_rows(10) + .size_bytes(512) + .build(), + }; + + // Phase 1 (staged): empty + mock.expect_list_metrics_splits() + .times(1) + .returning(|_| Ok(ListMetricsSplitsResponse::empty())); + // Phase 2 (marked): one split to delete + let marked_resp = ListMetricsSplitsResponse::try_from_splits(&[marked_split]).unwrap(); + mock.expect_list_metrics_splits() + .times(1) + .returning(move |_| Ok(marked_resp.clone())); + mock.expect_delete_metrics_splits() + .times(1) + .returning(|req| { + assert_eq!(req.split_ids, ["metrics_aaa"]); + Ok(EmptyResponse {}) + }); + + let garbage_collect_actor = GarbageCollector::new( + MetastoreServiceClient::from_mock(mock), + storage_resolver, + ); + let universe = Universe::with_accelerated_time(); + let (_mailbox, handle) = universe.spawn_builder().spawn(garbage_collect_actor); + + let counters = handle.process_pending_and_observe().await.state; + assert_eq!(counters.num_passes, 1); + assert_eq!(counters.num_successful_gc_run, 1); + assert_eq!(counters.num_failed_gc_run, 0); + assert_eq!(counters.num_deleted_files, 1); + assert_eq!(counters.num_failed_splits, 0); + universe.assert_quit().await; + } } diff --git a/quickwit/quickwit-janitor/src/actors/retention_policy_executor.rs b/quickwit/quickwit-janitor/src/actors/retention_policy_executor.rs index 38a1fbd3e8f..26845aefb56 100644 --- a/quickwit/quickwit-janitor/src/actors/retention_policy_executor.rs +++ b/quickwit/quickwit-janitor/src/actors/retention_policy_executor.rs @@ -18,6 +18,7 @@ use std::time::Duration; use async_trait::async_trait; use itertools::Itertools; use quickwit_actors::{Actor, ActorContext, Handler}; +use quickwit_common::is_metrics_index; use quickwit_config::IndexConfig; use quickwit_metastore::ListIndexesMetadataResponseExt; use quickwit_proto::metastore::{ @@ -27,7 +28,9 @@ use quickwit_proto::types::IndexUid; use serde::Serialize; use tracing::{debug, error, info}; -use crate::retention_policy_execution::run_execute_retention_policy; +use crate::retention_policy_execution::{ + run_execute_parquet_retention_policy, run_execute_retention_policy, +}; const RUN_INTERVAL: Duration = Duration::from_secs(60 * 60); // 1 hours @@ -207,17 +210,33 @@ impl Handler for RetentionPolicyExecutor { .as_ref() .expect("Expected index to have retention policy configure."); - let execution_result = run_execute_retention_policy( - message.index_uid.clone(), - self.metastore.clone(), - retention_policy, - ctx, - ) - .await; - match execution_result { - Ok(splits) => self.counters.num_expired_splits += splits.len(), - Err(error) => { - error!(index_id=%message.index_uid.index_id, error=?error, "Failed to execute the retention policy on the index.") + if is_metrics_index(&message.index_uid.index_id) { + let execution_result = run_execute_parquet_retention_policy( + &message.index_uid, + self.metastore.clone(), + retention_policy, + ctx, + ) + .await; + match execution_result { + Ok(count) => self.counters.num_expired_splits += count, + Err(error) => { + error!(index_id=%message.index_uid.index_id, error=?error, "Failed to execute the parquet retention policy on the index.") + } + } + } else { + let execution_result = run_execute_retention_policy( + message.index_uid.clone(), + self.metastore.clone(), + retention_policy, + ctx, + ) + .await; + match execution_result { + Ok(splits) => self.counters.num_expired_splits += splits.len(), + Err(error) => { + error!(index_id=%message.index_uid.index_id, error=?error, "Failed to execute the retention policy on the index.") + } } } @@ -499,4 +518,67 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_parquet_retention_policy_execution_calls_dependencies() -> anyhow::Result<()> { + use quickwit_metastore::ListMetricsSplitsResponseExt; + use quickwit_parquet_engine::split::{ + MetricsSplitMetadata, MetricsSplitRecord, MetricsSplitState, SplitId, TimeRange, + }; + use quickwit_proto::metastore::ListMetricsSplitsResponse; + + let mut mock_metastore = MockMetastoreService::new(); + + // One metrics index with a 1-hour retention policy + mock_metastore + .expect_list_indexes_metadata() + .times(..) + .returning(|_| { + let indexes = make_indexes(&[("otel-metrics-v0_9", Some("1 hour"))]); + Ok(ListIndexesMetadataResponse::for_test(indexes)) + }); + + // Two published splits older than the retention cutoff + let expired_split = MetricsSplitRecord { + state: MetricsSplitState::Published, + update_timestamp: 0, + metadata: MetricsSplitMetadata::builder() + .split_id(SplitId::new("metrics_expired")) + .index_uid("otel-metrics-v0_9:00000000000000000000000000") + .time_range(TimeRange::new(0, 100)) + .num_rows(10) + .size_bytes(512) + .build(), + }; + let resp = ListMetricsSplitsResponse::try_from_splits(&[expired_split]).unwrap(); + mock_metastore + .expect_list_metrics_splits() + .times(1..) + .returning(move |_| Ok(resp.clone())); + + mock_metastore + .expect_mark_metrics_splits_for_deletion() + .times(1..) + .returning(|req| { + assert_eq!(req.split_ids, ["metrics_expired"]); + Ok(EmptyResponse {}) + }); + + let retention_policy_executor = + RetentionPolicyExecutor::new(MetastoreServiceClient::from_mock(mock_metastore)); + let universe = Universe::with_accelerated_time(); + let (_mailbox, handle) = universe.spawn_builder().spawn(retention_policy_executor); + + let counters = handle.process_pending_and_observe().await.state; + assert_eq!(counters.num_execution_passes, 0); + assert_eq!(counters.num_expired_splits, 0); + + universe.sleep(shift_time_by()).await; + let counters = handle.process_pending_and_observe().await.state; + assert_eq!(counters.num_execution_passes, 1); + assert_eq!(counters.num_expired_splits, 1); + universe.assert_quit().await; + + Ok(()) + } } diff --git a/quickwit/quickwit-janitor/src/lib.rs b/quickwit/quickwit-janitor/src/lib.rs index bef73160377..c93f9a02242 100644 --- a/quickwit/quickwit-janitor/src/lib.rs +++ b/quickwit/quickwit-janitor/src/lib.rs @@ -18,7 +18,7 @@ use quickwit_actors::{Mailbox, Universe}; use quickwit_common::pubsub::EventBroker; use quickwit_config::NodeConfig; use quickwit_indexing::actors::MergeSchedulerService; -use quickwit_metastore::SplitInfo; +use quickwit_metastore::{SplitInfo}; use quickwit_proto::metastore::MetastoreServiceClient; use quickwit_search::SearchJobPlacer; use quickwit_storage::StorageResolver; diff --git a/quickwit/quickwit-janitor/src/metrics.rs b/quickwit/quickwit-janitor/src/metrics.rs index f503739529a..c2093e88d3c 100644 --- a/quickwit/quickwit-janitor/src/metrics.rs +++ b/quickwit/quickwit-janitor/src/metrics.rs @@ -14,17 +14,15 @@ use once_cell::sync::Lazy; use quickwit_common::metrics::{ - IntCounter, IntCounterVec, IntGaugeVec, new_counter, new_counter_vec, new_gauge_vec, + IntCounterVec, IntGaugeVec, new_counter_vec, new_gauge_vec, }; pub struct JanitorMetrics { pub ongoing_num_delete_operations_total: IntGaugeVec<1>, - pub gc_deleted_splits: IntCounterVec<1>, - pub gc_deleted_bytes: IntCounter, - pub gc_runs: IntCounterVec<1>, - pub gc_seconds_total: IntCounter, - // TODO having a current run duration which is 0|undefined out of run, and returns `now - - // start_time` during a run would be nice + pub gc_deleted_splits: IntCounterVec<2>, + pub gc_deleted_bytes: IntCounterVec<1>, + pub gc_runs: IntCounterVec<2>, + pub gc_seconds_total: IntCounterVec<1>, } impl Default for JanitorMetrics { @@ -42,26 +40,28 @@ impl Default for JanitorMetrics { "Total number of splits deleted by the garbage collector.", "quickwit_janitor", &[], - ["result"], + ["result", "split_type"], ), - gc_deleted_bytes: new_counter( + gc_deleted_bytes: new_counter_vec( "gc_deleted_bytes_total", "Total number of bytes deleted by the garbage collector.", "quickwit_janitor", &[], + ["split_type"], ), gc_runs: new_counter_vec( "gc_runs_total", "Total number of garbage collector execition.", "quickwit_janitor", &[], - ["result"], + ["result", "split_type"], ), - gc_seconds_total: new_counter( + gc_seconds_total: new_counter_vec( "gc_seconds_total", "Total time spent running the garbage collector", "quickwit_janitor", &[], + ["split_type"], ), } } diff --git a/quickwit/quickwit-janitor/src/retention_policy_execution.rs b/quickwit/quickwit-janitor/src/retention_policy_execution.rs index e0ed448ff86..e37f1e56181 100644 --- a/quickwit/quickwit-janitor/src/retention_policy_execution.rs +++ b/quickwit/quickwit-janitor/src/retention_policy_execution.rs @@ -16,11 +16,13 @@ use quickwit_actors::ActorContext; use quickwit_common::pretty::PrettySample; use quickwit_config::RetentionPolicy; use quickwit_metastore::{ + ListMetricsSplitsQuery, ListMetricsSplitsRequestExt, ListMetricsSplitsResponseExt, ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, SplitMetadata, SplitState, }; use quickwit_proto::metastore::{ - ListSplitsRequest, MarkSplitsForDeletionRequest, MetastoreService, MetastoreServiceClient, + ListMetricsSplitsRequest, ListSplitsRequest, MarkMetricsSplitsForDeletionRequest, + MarkSplitsForDeletionRequest, MetastoreService, MetastoreServiceClient, }; use quickwit_proto::types::{IndexUid, SplitId}; use time::OffsetDateTime; @@ -91,3 +93,53 @@ pub async fn run_execute_retention_policy( .await?; Ok(expired_splits) } + +/// Detect all expired parquet splits based on a retention policy and +/// mark them as `MarkedForDeletion`. +pub async fn run_execute_parquet_retention_policy( + index_uid: &IndexUid, + metastore: MetastoreServiceClient, + retention_policy: &RetentionPolicy, + ctx: &ActorContext, +) -> anyhow::Result { + let retention_period = retention_policy.retention_period()?; + let current_timestamp = OffsetDateTime::now_utc().unix_timestamp(); + let max_retention_timestamp = current_timestamp - retention_period.as_secs() as i64; + + let query = ListMetricsSplitsQuery::for_index(index_uid.clone()) + .with_max_time_range_end(max_retention_timestamp); + + let request = ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query)?; + let response = ctx + .protect_future(metastore.list_metrics_splits(request)) + .await?; + + let expired_splits: Vec = + response.deserialize_splits()?; + + if expired_splits.is_empty() { + return Ok(0); + } + + let expired_split_ids: Vec = expired_splits + .iter() + .map(|s| s.metadata.split_id.to_string()) + .collect(); + + info!( + index_uid=%index_uid, + split_ids=?PrettySample::new(&expired_split_ids, 5), + "Marking {} parquet splits for deletion based on retention policy.", + expired_split_ids.len() + ); + + ctx.protect_future( + metastore.mark_metrics_splits_for_deletion(MarkMetricsSplitsForDeletionRequest { + index_uid: Some(index_uid.clone()), + split_ids: expired_split_ids, + }), + ) + .await?; + + Ok(expired_splits.len()) +} From bc3e7af57a19fd101448ad33e882c0e40953e0dc Mon Sep 17 00:00:00 2001 From: Matthew Kim Date: Mon, 30 Mar 2026 16:54:04 -0400 Subject: [PATCH 5/7] Add CLI support for parquet GC, IndexService method, and OTEL metrics retention config --- quickwit/quickwit-cli/src/tool.rs | 86 +++++++++++++++---- .../quickwit-index-management/src/index.rs | 42 +++++++++ quickwit/quickwit-index-management/src/lib.rs | 2 +- .../src/otlp/otel_metrics.rs | 18 ++++ 4 files changed, 129 insertions(+), 19 deletions(-) diff --git a/quickwit/quickwit-cli/src/tool.rs b/quickwit/quickwit-cli/src/tool.rs index 0f34a3017e6..2b4e5f6159c 100644 --- a/quickwit/quickwit-cli/src/tool.rs +++ b/quickwit/quickwit-cli/src/tool.rs @@ -660,51 +660,101 @@ pub async fn garbage_collect_index_cli(args: GarbageCollectIndexArgs) -> anyhow: get_resolvers(&config.storage_configs, &config.metastore_configs); let metastore = metastore_resolver.resolve(&config.metastore_uri).await?; let mut index_service = IndexService::new(metastore, storage_resolver); + + if quickwit_common::is_metrics_index(&args.index_id) { + let removal_info = index_service + .garbage_collect_parquet_index(&args.index_id, args.grace_period, args.dry_run) + .await?; + return print_parquet_gc_result(args.dry_run, removal_info); + } + let removal_info = index_service .garbage_collect_index(&args.index_id, args.grace_period, args.dry_run) .await?; + print_tantivy_gc_result(args.dry_run, removal_info) +} + +fn print_tantivy_gc_result( + dry_run: bool, + removal_info: quickwit_index_management::SplitRemovalInfo, +) -> anyhow::Result<()> { if removal_info.removed_split_entries.is_empty() && removal_info.failed_splits.is_empty() { println!("No dangling files to garbage collect."); return Ok(()); } - if args.dry_run { + if dry_run { println!("The following files will be garbage collected."); - for split_info in removal_info.removed_split_entries { - println!(" - {}", split_info.file_name.display()); + for entry in &removal_info.removed_split_entries { + println!(" - {}", entry.file_name.display()); } return Ok(()); } if !removal_info.failed_splits.is_empty() { println!("The following splits were attempted to be removed, but failed."); - for split_info in &removal_info.failed_splits { - println!(" - {}", split_info.split_id); + for split in &removal_info.failed_splits { + println!(" - {}", split.split_id); } - println!( - "{} Splits were unable to be removed.", - removal_info.failed_splits.len() - ); + println!("{} Splits were unable to be removed.", removal_info.failed_splits.len()); } let deleted_bytes: u64 = removal_info .removed_split_entries .iter() - .map(|split_info| split_info.file_size_bytes.as_u64()) + .map(|s| s.file_size_bytes.as_u64()) .sum(); - println!( - "{}MB of storage garbage collected.", - deleted_bytes / 1_000_000 - ); + println!("{}MB of storage garbage collected.", deleted_bytes / 1_000_000); if removal_info.failed_splits.is_empty() { + println!("{} Index successfully garbage collected.", "✔".color(GREEN_COLOR)); + } else if removal_info.removed_split_entries.is_empty() { + println!("{} Failed to garbage collect index.", "✘".color(RED_COLOR)); + } else { println!( - "{} Index successfully garbage collected.", - "✔".color(GREEN_COLOR) + "{} Index partially garbage collected.", + "✘".color(RED_COLOR) ); - } else if removal_info.removed_split_entries.is_empty() - && !removal_info.failed_splits.is_empty() + } + + Ok(()) +} + +fn print_parquet_gc_result( + dry_run: bool, + removal_info: quickwit_index_management::ParquetSplitRemovalInfo, +) -> anyhow::Result<()> { + if removal_info.removed_parquet_splits_entries.is_empty() + && removal_info.failed_parquet_splits.is_empty() { + println!("No dangling files to garbage collect."); + return Ok(()); + } + + if dry_run { + println!("The following files will be garbage collected."); + for entry in &removal_info.removed_parquet_splits_entries { + println!(" - {}.parquet", entry.split_id); + } + return Ok(()); + } + + if !removal_info.failed_parquet_splits.is_empty() { + println!("The following splits were attempted to be removed, but failed."); + for split in &removal_info.failed_parquet_splits { + println!(" - {}", split.split_id); + } + println!( + "{} Splits were unable to be removed.", + removal_info.failed_parquet_splits.len() + ); + } + + println!("{}MB of storage garbage collected.", removal_info.removed_bytes() / 1_000_000); + + if removal_info.failed_parquet_splits.is_empty() { + println!("{} Index successfully garbage collected.", "✔".color(GREEN_COLOR)); + } else if removal_info.removed_parquet_splits_entries.is_empty() { println!("{} Failed to garbage collect index.", "✘".color(RED_COLOR)); } else { println!( diff --git a/quickwit/quickwit-index-management/src/index.rs b/quickwit/quickwit-index-management/src/index.rs index 2cfa0614e40..1787ff1e276 100644 --- a/quickwit/quickwit-index-management/src/index.rs +++ b/quickwit/quickwit-index-management/src/index.rs @@ -45,6 +45,7 @@ use crate::garbage_collection::{ DeleteSplitsError, SplitRemovalInfo, delete_splits_from_storage_and_metastore, run_garbage_collect, }; +use crate::parquet_garbage_collection::{ParquetSplitRemovalInfo, run_parquet_garbage_collect}; #[derive(Error, Debug)] pub enum IndexServiceError { @@ -405,6 +406,47 @@ impl IndexService { Ok(deleted_entries) } + /// Detect all dangling parquet splits and associated files from a metrics index + /// and removes them. + /// + /// * `index_id` - The target metrics index Id. + /// * `grace_period` - Threshold period after which a staged split can be garbage collected. + /// * `dry_run` - Should this only return a list of affected files without performing deletion. + pub async fn garbage_collect_parquet_index( + &mut self, + index_id: &str, + grace_period: Duration, + dry_run: bool, + ) -> anyhow::Result { + let index_metadata_request = IndexMetadataRequest::for_index_id(index_id.to_string()); + let index_metadata = self + .metastore + .index_metadata(index_metadata_request) + .await? + .deserialize_index_metadata()?; + let index_uid = index_metadata.index_uid.clone(); + let index_config = index_metadata.into_index_config(); + let storage = self + .storage_resolver + .resolve(&index_config.index_uri) + .await?; + + let deleted_entries = run_parquet_garbage_collect( + HashMap::from([(index_uid, storage)]), + self.metastore.clone(), + grace_period, + // deletion_grace_period of zero, so that a cli call directly deletes splits after + // marking to be deleted. + Duration::ZERO, + dry_run, + None, + None, + ) + .await?; + + Ok(deleted_entries) + } + /// Clears the index by applying the following actions: /// - mark all splits for deletion in the metastore. /// - delete the files of all splits marked for deletion using garbage collection. diff --git a/quickwit/quickwit-index-management/src/lib.rs b/quickwit/quickwit-index-management/src/lib.rs index cd35f215825..be44c9f081c 100644 --- a/quickwit/quickwit-index-management/src/lib.rs +++ b/quickwit/quickwit-index-management/src/lib.rs @@ -16,7 +16,7 @@ mod garbage_collection; mod index; mod parquet_garbage_collection; -pub use garbage_collection::{GcMetrics, run_garbage_collect}; +pub use garbage_collection::{GcMetrics, SplitRemovalInfo, run_garbage_collect}; pub use index::{IndexService, IndexServiceError, clear_cache_directory, validate_storage_uri}; pub use parquet_garbage_collection::{ ParquetSplitInfo, ParquetSplitRemovalInfo, run_parquet_garbage_collect, diff --git a/quickwit/quickwit-opentelemetry/src/otlp/otel_metrics.rs b/quickwit/quickwit-opentelemetry/src/otlp/otel_metrics.rs index bcb67c5fadb..5cab2f96a87 100644 --- a/quickwit/quickwit-opentelemetry/src/otlp/otel_metrics.rs +++ b/quickwit/quickwit-opentelemetry/src/otlp/otel_metrics.rs @@ -47,6 +47,9 @@ pub const OTEL_METRICS_INDEX_ID: &str = "otel-metrics-v0_9"; /// directly and queries via DataFusion. The doc mapping is unused; this config /// exists only so the index can be registered in the metastore for source /// assignment and lifecycle management. +/// +/// TODO: As a temporary hack, we are including a timestamp_field, so that +/// we can pass the retention policy validation. const OTEL_METRICS_INDEX_CONFIG: &str = r#" version: 0.8 @@ -54,10 +57,19 @@ index_id: ${INDEX_ID} doc_mapping: mode: dynamic + field_mappings: + - name: timestamp + type: datetime + fast: true + timestamp_field: timestamp indexing_settings: commit_timeout_secs: 15 +retention: + period: 30 days + schedule: hourly + search_settings: default_search_fields: [] "#; @@ -542,6 +554,12 @@ mod tests { let index_config = OtlpGrpcMetricsService::index_config(&Uri::for_test("ram:///indexes")).unwrap(); assert_eq!(index_config.index_id, OTEL_METRICS_INDEX_ID); + let retention = index_config.retention_policy_opt.expect("retention policy should be set"); + assert_eq!( + retention.retention_period().unwrap(), + std::time::Duration::from_secs(30 * 24 * 3600) + ); + assert_eq!(retention.evaluation_schedule, "hourly"); } #[tokio::test] From fa5eee380e0c800e58ba572393b6c1aa11013a6a Mon Sep 17 00:00:00 2001 From: Matthew Kim Date: Mon, 30 Mar 2026 17:06:46 -0400 Subject: [PATCH 6/7] Fix rustfmt formatting --- quickwit/quickwit-cli/src/tool.rs | 25 ++++- .../src/garbage_collection.rs | 21 ++++- .../src/parquet_garbage_collection.rs | 20 ++-- .../src/actors/garbage_collector.rs | 93 ++++++++++--------- quickwit/quickwit-janitor/src/lib.rs | 2 +- quickwit/quickwit-janitor/src/metrics.rs | 4 +- .../src/retention_policy_execution.rs | 10 +- .../file_backed/file_backed_index/mod.rs | 8 +- .../src/otlp/otel_metrics.rs | 10 +- 9 files changed, 119 insertions(+), 74 deletions(-) diff --git a/quickwit/quickwit-cli/src/tool.rs b/quickwit/quickwit-cli/src/tool.rs index 2b4e5f6159c..cb43f2cd633 100644 --- a/quickwit/quickwit-cli/src/tool.rs +++ b/quickwit/quickwit-cli/src/tool.rs @@ -696,7 +696,10 @@ fn print_tantivy_gc_result( for split in &removal_info.failed_splits { println!(" - {}", split.split_id); } - println!("{} Splits were unable to be removed.", removal_info.failed_splits.len()); + println!( + "{} Splits were unable to be removed.", + removal_info.failed_splits.len() + ); } let deleted_bytes: u64 = removal_info @@ -704,10 +707,16 @@ fn print_tantivy_gc_result( .iter() .map(|s| s.file_size_bytes.as_u64()) .sum(); - println!("{}MB of storage garbage collected.", deleted_bytes / 1_000_000); + println!( + "{}MB of storage garbage collected.", + deleted_bytes / 1_000_000 + ); if removal_info.failed_splits.is_empty() { - println!("{} Index successfully garbage collected.", "✔".color(GREEN_COLOR)); + println!( + "{} Index successfully garbage collected.", + "✔".color(GREEN_COLOR) + ); } else if removal_info.removed_split_entries.is_empty() { println!("{} Failed to garbage collect index.", "✘".color(RED_COLOR)); } else { @@ -750,10 +759,16 @@ fn print_parquet_gc_result( ); } - println!("{}MB of storage garbage collected.", removal_info.removed_bytes() / 1_000_000); + println!( + "{}MB of storage garbage collected.", + removal_info.removed_bytes() / 1_000_000 + ); if removal_info.failed_parquet_splits.is_empty() { - println!("{} Index successfully garbage collected.", "✔".color(GREEN_COLOR)); + println!( + "{} Index successfully garbage collected.", + "✔".color(GREEN_COLOR) + ); } else if removal_info.removed_parquet_splits_entries.is_empty() { println!("{} Failed to garbage collect index.", "✘".color(RED_COLOR)); } else { diff --git a/quickwit/quickwit-index-management/src/garbage_collection.rs b/quickwit/quickwit-index-management/src/garbage_collection.rs index 3696181b6ba..606de030762 100644 --- a/quickwit/quickwit-index-management/src/garbage_collection.rs +++ b/quickwit/quickwit-index-management/src/garbage_collection.rs @@ -424,7 +424,11 @@ pub(crate) async fn delete_split_files( storage: &dyn Storage, splits: Vec, progress_opt: Option<&Progress>, -) -> (Vec, Vec, Option) { +) -> ( + Vec, + Vec, + Option, +) { if splits.is_empty() { return (Vec::new(), Vec::new(), None); } @@ -438,8 +442,9 @@ pub(crate) async fn delete_split_files( Ok(()) => (splits, Vec::new(), None), Err(bulk_err) => { let success_paths: HashSet<&PathBuf> = bulk_err.successes.iter().collect(); - let (succeeded, failed) = - splits.into_iter().partition(|s| success_paths.contains(&s.path)); + let (succeeded, failed) = splits + .into_iter() + .partition(|s| success_paths.contains(&s.path)); (succeeded, failed, Some(bulk_err)) } } @@ -478,8 +483,14 @@ pub async fn delete_splits_from_storage_and_metastore( let (succeeded_stds, failed_stds, storage_err) = delete_split_files(&*storage, splits_to_delete, progress_opt).await; - let successes: Vec = succeeded_stds.iter().map(|s| split_infos[&s.path].clone()).collect(); - let storage_failures: Vec = failed_stds.iter().map(|s| split_infos[&s.path].clone()).collect(); + let successes: Vec = succeeded_stds + .iter() + .map(|s| split_infos[&s.path].clone()) + .collect(); + let storage_failures: Vec = failed_stds + .iter() + .map(|s| split_infos[&s.path].clone()) + .collect(); let mut storage_error: Option = None; if let Some(bulk_delete_error) = storage_err { diff --git a/quickwit/quickwit-index-management/src/parquet_garbage_collection.rs b/quickwit/quickwit-index-management/src/parquet_garbage_collection.rs index 656a5edb656..a4b5ed37b09 100644 --- a/quickwit/quickwit-index-management/src/parquet_garbage_collection.rs +++ b/quickwit/quickwit-index-management/src/parquet_garbage_collection.rs @@ -301,8 +301,7 @@ async fn delete_marked_parquet_splits( // To detect if this is the last page, we check if the number of splits is less than the // limit. assert!(splits.len() <= DELETE_PARQUET_SPLITS_BATCH_SIZE); - let splits_to_delete_possibly_remaining = - splits.len() == DELETE_PARQUET_SPLITS_BATCH_SIZE; + let splits_to_delete_possibly_remaining = splits.len() == DELETE_PARQUET_SPLITS_BATCH_SIZE; // Set split after which to search for the next loop. let Some(last_split) = splits.last() else { @@ -368,7 +367,10 @@ async fn delete_parquet_splits_from_storage_and_metastore( let storage_failed: Vec = failed_stds .into_iter() - .map(|s| ParquetSplitInfo { split_id: s.split_id, file_size_bytes: s.size_bytes }) + .map(|s| ParquetSplitInfo { + split_id: s.split_id, + file_size_bytes: s.size_bytes, + }) .collect(); if succeeded_stds.is_empty() { @@ -381,8 +383,11 @@ async fn delete_parquet_splits_from_storage_and_metastore( index_uid: Some(index_uid.clone()), split_ids: ids_to_delete, }; - let metastore_result = - protect_future(progress_opt, metastore.delete_metrics_splits(delete_request)).await; + let metastore_result = protect_future( + progress_opt, + metastore.delete_metrics_splits(delete_request), + ) + .await; if let Some(progress) = progress_opt { progress.record_progress(); @@ -390,7 +395,10 @@ async fn delete_parquet_splits_from_storage_and_metastore( let succeeded: Vec = succeeded_stds .into_iter() - .map(|s| ParquetSplitInfo { split_id: s.split_id, file_size_bytes: s.size_bytes }) + .map(|s| ParquetSplitInfo { + split_id: s.split_id, + file_size_bytes: s.size_bytes, + }) .collect(); match metastore_result { diff --git a/quickwit/quickwit-janitor/src/actors/garbage_collector.rs b/quickwit/quickwit-janitor/src/actors/garbage_collector.rs index e95febbaacb..cc94429227d 100644 --- a/quickwit/quickwit-janitor/src/actors/garbage_collector.rs +++ b/quickwit/quickwit-janitor/src/actors/garbage_collector.rs @@ -113,32 +113,13 @@ impl GarbageCollector { counters: GarbageCollectorCounters::default(), } } - // if !deleted_file_entries.is_empty() { - // let num_deleted_splits = deleted_file_entries.len(); - // let num_deleted_bytes = deleted_file_entries - // .iter() - // .map(|entry| entry.file_size_bytes.as_u64() as usize) - // .sum::(); - // let deleted_files: HashSet<&Path> = deleted_file_entries - // .iter() - // .map(|deleted_entry| deleted_entry.file_name.as_path()) - // .take(5) - // .collect(); - // info!( - // num_deleted_splits = num_deleted_splits, - // "Janitor deleted {:?} and {} other splits.", deleted_files, num_deleted_splits, - // ); - // self.counters.num_deleted_files += num_deleted_splits; - // self.counters.num_deleted_bytes += num_deleted_bytes; fn record_gc_result(&mut self, result: &GcRunResult, split_type: &str) { self.counters.num_failed_splits += result.num_failed; if result.num_deleted_splits > 0 { info!( "Janitor deleted {:?} and {} other {} splits.", - result.sample_deleted_files, - result.num_deleted_splits, - split_type, + result.sample_deleted_files, result.num_deleted_splits, split_type, ); self.counters.num_deleted_files += result.num_deleted_splits; self.counters.num_deleted_bytes += result.num_deleted_bytes; @@ -222,22 +203,40 @@ impl GarbageCollector { .await; let tantivy_run_duration = tantivy_start.elapsed().as_secs(); - JANITOR_METRICS.gc_seconds_total.with_label_values(["tantivy"]).inc_by(tantivy_run_duration); + JANITOR_METRICS + .gc_seconds_total + .with_label_values(["tantivy"]) + .inc_by(tantivy_run_duration); let result = match gc_res { Ok(removal_info) => { self.counters.num_successful_gc_run += 1; - JANITOR_METRICS.gc_runs.with_label_values(["success", "tantivy"]).inc(); + JANITOR_METRICS + .gc_runs + .with_label_values(["success", "tantivy"]) + .inc(); GcRunResult { num_deleted_splits: removal_info.removed_split_entries.len(), - num_deleted_bytes: removal_info.removed_split_entries.iter().map(|e| e.file_size_bytes.as_u64() as usize).sum(), + num_deleted_bytes: removal_info + .removed_split_entries + .iter() + .map(|e| e.file_size_bytes.as_u64() as usize) + .sum(), num_failed: removal_info.failed_splits.len(), - sample_deleted_files: removal_info.removed_split_entries.iter().take(5).map(|e| e.file_name.display().to_string()).collect(), + sample_deleted_files: removal_info + .removed_split_entries + .iter() + .take(5) + .map(|e| e.file_name.display().to_string()) + .collect(), } } Err(error) => { self.counters.num_failed_gc_run += 1; - JANITOR_METRICS.gc_runs.with_label_values(["error", "tantivy"]).inc(); + JANITOR_METRICS + .gc_runs + .with_label_values(["error", "tantivy"]) + .inc(); error!(error=?error, "failed to run garbage collection"); GcRunResult::failed() } @@ -260,22 +259,36 @@ impl GarbageCollector { .await; let parquet_run_duration = parquet_start.elapsed().as_secs(); - JANITOR_METRICS.gc_seconds_total.with_label_values(["parquet"]).inc_by(parquet_run_duration); + JANITOR_METRICS + .gc_seconds_total + .with_label_values(["parquet"]) + .inc_by(parquet_run_duration); let result = match gc_res { Ok(removal_info) => { self.counters.num_successful_gc_run += 1; - JANITOR_METRICS.gc_runs.with_label_values(["success", "parquet"]).inc(); + JANITOR_METRICS + .gc_runs + .with_label_values(["success", "parquet"]) + .inc(); GcRunResult { num_deleted_splits: removal_info.removed_split_count(), num_deleted_bytes: removal_info.removed_bytes() as usize, num_failed: removal_info.failed_split_count(), - sample_deleted_files: removal_info.removed_parquet_splits_entries.iter().take(5).map(|e| format!("{}.parquet", e.split_id)).collect(), + sample_deleted_files: removal_info + .removed_parquet_splits_entries + .iter() + .take(5) + .map(|e| format!("{}.parquet", e.split_id)) + .collect(), } } Err(error) => { self.counters.num_failed_gc_run += 1; - JANITOR_METRICS.gc_runs.with_label_values(["error", "parquet"]).inc(); + JANITOR_METRICS + .gc_runs + .with_label_values(["error", "parquet"]) + .inc(); error!(error=?error, "failed to run parquet garbage collection"); GcRunResult::failed() } @@ -865,15 +878,13 @@ mod tests { let storage_resolver = StorageResolver::unconfigured(); let mut mock = MockMetastoreService::new(); - mock.expect_list_indexes_metadata() - .times(1) - .returning(|_| { - let indexes = vec![IndexMetadata::for_test( - "otel-metrics-v0_1", - "ram://indexes/otel-metrics-v0_1", - )]; - Ok(ListIndexesMetadataResponse::for_test(indexes)) - }); + mock.expect_list_indexes_metadata().times(1).returning(|_| { + let indexes = vec![IndexMetadata::for_test( + "otel-metrics-v0_1", + "ram://indexes/otel-metrics-v0_1", + )]; + Ok(ListIndexesMetadataResponse::for_test(indexes)) + }); let marked_split = MetricsSplitRecord { state: MetricsSplitState::MarkedForDeletion, @@ -903,10 +914,8 @@ mod tests { Ok(EmptyResponse {}) }); - let garbage_collect_actor = GarbageCollector::new( - MetastoreServiceClient::from_mock(mock), - storage_resolver, - ); + let garbage_collect_actor = + GarbageCollector::new(MetastoreServiceClient::from_mock(mock), storage_resolver); let universe = Universe::with_accelerated_time(); let (_mailbox, handle) = universe.spawn_builder().spawn(garbage_collect_actor); diff --git a/quickwit/quickwit-janitor/src/lib.rs b/quickwit/quickwit-janitor/src/lib.rs index c93f9a02242..bef73160377 100644 --- a/quickwit/quickwit-janitor/src/lib.rs +++ b/quickwit/quickwit-janitor/src/lib.rs @@ -18,7 +18,7 @@ use quickwit_actors::{Mailbox, Universe}; use quickwit_common::pubsub::EventBroker; use quickwit_config::NodeConfig; use quickwit_indexing::actors::MergeSchedulerService; -use quickwit_metastore::{SplitInfo}; +use quickwit_metastore::SplitInfo; use quickwit_proto::metastore::MetastoreServiceClient; use quickwit_search::SearchJobPlacer; use quickwit_storage::StorageResolver; diff --git a/quickwit/quickwit-janitor/src/metrics.rs b/quickwit/quickwit-janitor/src/metrics.rs index c2093e88d3c..c95dcda9da0 100644 --- a/quickwit/quickwit-janitor/src/metrics.rs +++ b/quickwit/quickwit-janitor/src/metrics.rs @@ -13,9 +13,7 @@ // limitations under the License. use once_cell::sync::Lazy; -use quickwit_common::metrics::{ - IntCounterVec, IntGaugeVec, new_counter_vec, new_gauge_vec, -}; +use quickwit_common::metrics::{IntCounterVec, IntGaugeVec, new_counter_vec, new_gauge_vec}; pub struct JanitorMetrics { pub ongoing_num_delete_operations_total: IntGaugeVec<1>, diff --git a/quickwit/quickwit-janitor/src/retention_policy_execution.rs b/quickwit/quickwit-janitor/src/retention_policy_execution.rs index e37f1e56181..76400496fa0 100644 --- a/quickwit/quickwit-janitor/src/retention_policy_execution.rs +++ b/quickwit/quickwit-janitor/src/retention_policy_execution.rs @@ -95,7 +95,7 @@ pub async fn run_execute_retention_policy( } /// Detect all expired parquet splits based on a retention policy and -/// mark them as `MarkedForDeletion`. +/// mark them as `MarkedForDeletion`. pub async fn run_execute_parquet_retention_policy( index_uid: &IndexUid, metastore: MetastoreServiceClient, @@ -133,12 +133,12 @@ pub async fn run_execute_parquet_retention_policy( expired_split_ids.len() ); - ctx.protect_future( - metastore.mark_metrics_splits_for_deletion(MarkMetricsSplitsForDeletionRequest { + ctx.protect_future(metastore.mark_metrics_splits_for_deletion( + MarkMetricsSplitsForDeletionRequest { index_uid: Some(index_uid.clone()), split_ids: expired_split_ids, - }), - ) + }, + )) .await?; Ok(expired_splits.len()) diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs index f9088c8d675..5ea7e201548 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs @@ -862,13 +862,15 @@ impl FileBackedIndex { // Sort by split_id for stable pagination (mirrors Postgres ORDER BY split_id ASC). splits.sort_unstable_by(|a, b| { - a.metadata.split_id.as_str().cmp(b.metadata.split_id.as_str()) + a.metadata + .split_id + .as_str() + .cmp(b.metadata.split_id.as_str()) }); // Apply cursor: skip splits up to and including after_split_id. let splits = if let Some(ref after) = query.after_split_id { - let pos = - splits.partition_point(|s| s.metadata.split_id.as_str() <= after.as_str()); + let pos = splits.partition_point(|s| s.metadata.split_id.as_str() <= after.as_str()); &splits[pos..] } else { &splits[..] diff --git a/quickwit/quickwit-opentelemetry/src/otlp/otel_metrics.rs b/quickwit/quickwit-opentelemetry/src/otlp/otel_metrics.rs index 5cab2f96a87..4e56f3c9474 100644 --- a/quickwit/quickwit-opentelemetry/src/otlp/otel_metrics.rs +++ b/quickwit/quickwit-opentelemetry/src/otlp/otel_metrics.rs @@ -47,9 +47,9 @@ pub const OTEL_METRICS_INDEX_ID: &str = "otel-metrics-v0_9"; /// directly and queries via DataFusion. The doc mapping is unused; this config /// exists only so the index can be registered in the metastore for source /// assignment and lifecycle management. -/// -/// TODO: As a temporary hack, we are including a timestamp_field, so that -/// we can pass the retention policy validation. +/// +/// TODO: As a temporary hack, we are including a timestamp_field, so that +/// we can pass the retention policy validation. const OTEL_METRICS_INDEX_CONFIG: &str = r#" version: 0.8 @@ -554,7 +554,9 @@ mod tests { let index_config = OtlpGrpcMetricsService::index_config(&Uri::for_test("ram:///indexes")).unwrap(); assert_eq!(index_config.index_id, OTEL_METRICS_INDEX_ID); - let retention = index_config.retention_policy_opt.expect("retention policy should be set"); + let retention = index_config + .retention_policy_opt + .expect("retention policy should be set"); assert_eq!( retention.retention_period().unwrap(), std::time::Duration::from_secs(30 * 24 * 3600) From 1d0846727081bb42e12d8ac7841fdcf85c3a9156 Mon Sep 17 00:00:00 2001 From: Matthew Kim Date: Tue, 31 Mar 2026 10:47:27 -0400 Subject: [PATCH 7/7] appease linter --- .../src/actors/garbage_collector.rs | 4 ++-- .../file_backed/file_backed_index/mod.rs | 16 ++++++++-------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/quickwit/quickwit-janitor/src/actors/garbage_collector.rs b/quickwit/quickwit-janitor/src/actors/garbage_collector.rs index cc94429227d..8fe266d7107 100644 --- a/quickwit/quickwit-janitor/src/actors/garbage_collector.rs +++ b/quickwit/quickwit-janitor/src/actors/garbage_collector.rs @@ -12,8 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet}; -use std::path::Path; +use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -336,6 +335,7 @@ impl Handler for GarbageCollector { #[cfg(test)] mod tests { + use std::collections::HashSet; use std::ops::Bound; use std::path::Path; use std::sync::Arc; diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs index 5ea7e201548..eb86c84a970 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs @@ -1005,17 +1005,17 @@ fn metrics_split_matches_query(split: &StoredMetricsSplit, query: &ListMetricsSp } // Filter by update timestamp - if let Some(update_timestamp_lte) = query.update_timestamp_lte { - if split.update_timestamp > update_timestamp_lte { - return false; - } + if let Some(update_timestamp_lte) = query.update_timestamp_lte + && split.update_timestamp > update_timestamp_lte + { + return false; } // Filter by max time_range_end (retention policy: splits whose data ends before cutoff) - if let Some(max_time_range_end) = query.max_time_range_end { - if (split.metadata.time_range.end_secs as i64) > max_time_range_end { - return false; - } + if let Some(max_time_range_end) = query.max_time_range_end + && (split.metadata.time_range.end_secs as i64) > max_time_range_end + { + return false; } true