Skip to content
2 changes: 2 additions & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

81 changes: 73 additions & 8 deletions quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -660,26 +660,41 @@ pub async fn garbage_collect_index_cli(args: GarbageCollectIndexArgs) -> anyhow:
get_resolvers(&config.storage_configs, &config.metastore_configs);
let metastore = metastore_resolver.resolve(&config.metastore_uri).await?;
let mut index_service = IndexService::new(metastore, storage_resolver);

if quickwit_common::is_metrics_index(&args.index_id) {
let removal_info = index_service
.garbage_collect_parquet_index(&args.index_id, args.grace_period, args.dry_run)
.await?;
return print_parquet_gc_result(args.dry_run, removal_info);
}

let removal_info = index_service
.garbage_collect_index(&args.index_id, args.grace_period, args.dry_run)
.await?;
print_tantivy_gc_result(args.dry_run, removal_info)
}

fn print_tantivy_gc_result(
dry_run: bool,
removal_info: quickwit_index_management::SplitRemovalInfo,
) -> anyhow::Result<()> {
if removal_info.removed_split_entries.is_empty() && removal_info.failed_splits.is_empty() {
println!("No dangling files to garbage collect.");
return Ok(());
}

if args.dry_run {
if dry_run {
println!("The following files will be garbage collected.");
for split_info in removal_info.removed_split_entries {
println!(" - {}", split_info.file_name.display());
for entry in &removal_info.removed_split_entries {
println!(" - {}", entry.file_name.display());
}
return Ok(());
}

if !removal_info.failed_splits.is_empty() {
println!("The following splits were attempted to be removed, but failed.");
for split_info in &removal_info.failed_splits {
println!(" - {}", split_info.split_id);
for split in &removal_info.failed_splits {
println!(" - {}", split.split_id);
}
println!(
"{} Splits were unable to be removed.",
Expand All @@ -690,7 +705,7 @@ pub async fn garbage_collect_index_cli(args: GarbageCollectIndexArgs) -> anyhow:
let deleted_bytes: u64 = removal_info
.removed_split_entries
.iter()
.map(|split_info| split_info.file_size_bytes.as_u64())
.map(|s| s.file_size_bytes.as_u64())
.sum();
println!(
"{}MB of storage garbage collected.",
Expand All @@ -702,9 +717,59 @@ pub async fn garbage_collect_index_cli(args: GarbageCollectIndexArgs) -> anyhow:
"{} Index successfully garbage collected.",
"✔".color(GREEN_COLOR)
);
} else if removal_info.removed_split_entries.is_empty()
&& !removal_info.failed_splits.is_empty()
} else if removal_info.removed_split_entries.is_empty() {
println!("{} Failed to garbage collect index.", "✘".color(RED_COLOR));
} else {
println!(
"{} Index partially garbage collected.",
"✘".color(RED_COLOR)
);
}

Ok(())
}

fn print_parquet_gc_result(
dry_run: bool,
removal_info: quickwit_index_management::ParquetSplitRemovalInfo,
) -> anyhow::Result<()> {
if removal_info.removed_parquet_splits_entries.is_empty()
&& removal_info.failed_parquet_splits.is_empty()
{
println!("No dangling files to garbage collect.");
return Ok(());
}

if dry_run {
println!("The following files will be garbage collected.");
for entry in &removal_info.removed_parquet_splits_entries {
println!(" - {}.parquet", entry.split_id);
}
return Ok(());
}

if !removal_info.failed_parquet_splits.is_empty() {
println!("The following splits were attempted to be removed, but failed.");
for split in &removal_info.failed_parquet_splits {
println!(" - {}", split.split_id);
}
println!(
"{} Splits were unable to be removed.",
removal_info.failed_parquet_splits.len()
);
}

println!(
"{}MB of storage garbage collected.",
removal_info.removed_bytes() / 1_000_000
);

if removal_info.failed_parquet_splits.is_empty() {
println!(
"{} Index successfully garbage collected.",
"✔".color(GREEN_COLOR)
);
} else if removal_info.removed_parquet_splits_entries.is_empty() {
println!("{} Failed to garbage collect index.", "✘".color(RED_COLOR));
} else {
println!(
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-index-management/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ quickwit-common = { workspace = true }
quickwit-config = { workspace = true }
quickwit-indexing = { workspace = true }
quickwit-metastore = { workspace = true }
quickwit-parquet-engine = { workspace = true }
quickwit-proto = { workspace = true }
quickwit-storage = { workspace = true }

Expand Down
120 changes: 80 additions & 40 deletions quickwit/quickwit-index-management/src/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub struct GcMetrics {
pub failed_splits: IntCounter,
}

trait RecordGcMetrics {
pub(crate) trait RecordGcMetrics {
fn record(&self, num_delete_splits: usize, num_deleted_bytes: u64, num_failed_splits: usize);
}

Expand All @@ -72,7 +72,7 @@ pub struct DeleteSplitsError {
metastore_failures: Vec<SplitInfo>,
}

async fn protect_future<Fut, T>(progress: Option<&Progress>, future: Fut) -> T
pub(crate) async fn protect_future<Fut, T>(progress: Option<&Progress>, future: Fut) -> T
where Fut: Future<Output = T> {
match progress {
None => future.await,
Expand Down Expand Up @@ -289,7 +289,7 @@ async fn list_splits_metadata(

/// In order to avoid hammering the load on the metastore, we can throttle the rate of split
/// deletion by setting this environment variable.
fn get_maximum_split_deletion_rate_per_sec() -> Option<usize> {
pub(crate) fn get_maximum_split_deletion_rate_per_sec() -> Option<usize> {
static MAX_SPLIT_DELETION_RATE_PER_SEC: OnceLock<Option<usize>> = OnceLock::new();
*MAX_SPLIT_DELETION_RATE_PER_SEC.get_or_init(|| {
quickwit_common::get_from_env_opt::<usize>("QW_MAX_SPLIT_DELETION_RATE_PER_SEC", false)
Expand Down Expand Up @@ -408,6 +408,48 @@ async fn delete_splits_marked_for_deletion_several_indexes(
split_removal_info
}

/// A split normalized for storage deletion: just the id, path, and size.
/// Used as the common currency between tantivy and parquet GC paths.
pub(crate) struct SplitToDelete {
pub split_id: String,
pub path: PathBuf,
pub size_bytes: u64,
}

/// Deletes split files from storage and partitions into (succeeded, failed).
///
/// Returns the `BulkDeleteError` if there was a partial failure, so the caller
/// can log it with index-specific context. Does NOT touch the metastore.
pub(crate) async fn delete_split_files(
storage: &dyn Storage,
splits: Vec<SplitToDelete>,
progress_opt: Option<&Progress>,
) -> (
Vec<SplitToDelete>,
Vec<SplitToDelete>,
Option<BulkDeleteError>,
) {
if splits.is_empty() {
return (Vec::new(), Vec::new(), None);
}
let paths: Vec<&Path> = splits.iter().map(|s| s.path.as_path()).collect();
let result = protect_future(progress_opt, storage.bulk_delete(&paths)).await;

if let Some(progress) = progress_opt {
progress.record_progress();
}
match result {
Ok(()) => (splits, Vec::new(), None),
Err(bulk_err) => {
let success_paths: HashSet<&PathBuf> = bulk_err.successes.iter().collect();
let (succeeded, failed) = splits
.into_iter()
.partition(|s| success_paths.contains(&s.path));
(succeeded, failed, Some(bulk_err))
}
}
}

/// Delete a list of splits from the storage and the metastore.
/// It should leave the index and the metastore in good state.
///
Expand All @@ -424,49 +466,47 @@ pub async fn delete_splits_from_storage_and_metastore(
progress_opt: Option<&Progress>,
) -> Result<Vec<SplitInfo>, DeleteSplitsError> {
let mut split_infos: HashMap<PathBuf, SplitInfo> = HashMap::with_capacity(splits.len());

for split in splits {
let split_info = split.as_split_info();
split_infos.insert(split_info.file_name.clone(), split_info);
}
let split_paths = split_infos
.keys()
.map(|split_path_buf| split_path_buf.as_path())
.collect::<Vec<&Path>>();
let delete_result = protect_future(progress_opt, storage.bulk_delete(&split_paths)).await;

if let Some(progress) = progress_opt {
progress.record_progress();
}
let mut successes = Vec::with_capacity(split_infos.len());
let splits_to_delete: Vec<SplitToDelete> = split_infos
.values()
.map(|info| SplitToDelete {
split_id: info.split_id.clone(),
path: info.file_name.clone(),
size_bytes: info.file_size_bytes.as_u64(),
})
.collect();

let (succeeded_stds, failed_stds, storage_err) =
delete_split_files(&*storage, splits_to_delete, progress_opt).await;

let successes: Vec<SplitInfo> = succeeded_stds
.iter()
.map(|s| split_infos[&s.path].clone())
.collect();
let storage_failures: Vec<SplitInfo> = failed_stds
.iter()
.map(|s| split_infos[&s.path].clone())
.collect();

let mut storage_error: Option<BulkDeleteError> = None;
let mut storage_failures = Vec::new();

match delete_result {
Ok(_) => successes.extend(split_infos.into_values()),
Err(bulk_delete_error) => {
let success_split_paths: HashSet<&PathBuf> =
bulk_delete_error.successes.iter().collect();
for (split_path, split_info) in split_infos {
if success_split_paths.contains(&split_path) {
successes.push(split_info);
} else {
storage_failures.push(split_info);
}
}
let failed_split_paths = storage_failures
.iter()
.map(|split_info| split_info.file_name.as_path())
.collect::<Vec<_>>();
error!(
error=?bulk_delete_error.error,
index_id=index_uid.index_id,
"failed to delete split file(s) {:?} from storage",
PrettySample::new(&failed_split_paths, 5),
);
storage_error = Some(bulk_delete_error);
}
};
if let Some(bulk_delete_error) = storage_err {
let failed_split_paths = storage_failures
.iter()
.map(|split_info| split_info.file_name.as_path())
.collect::<Vec<_>>();
error!(
error=?bulk_delete_error.error,
index_id=index_uid.index_id,
"failed to delete split file(s) {:?} from storage",
PrettySample::new(&failed_split_paths, 5),
);
storage_error = Some(bulk_delete_error);
}

if !successes.is_empty() {
let split_ids: Vec<SplitId> = successes
.iter()
Expand Down
42 changes: 42 additions & 0 deletions quickwit/quickwit-index-management/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use crate::garbage_collection::{
DeleteSplitsError, SplitRemovalInfo, delete_splits_from_storage_and_metastore,
run_garbage_collect,
};
use crate::parquet_garbage_collection::{ParquetSplitRemovalInfo, run_parquet_garbage_collect};

#[derive(Error, Debug)]
pub enum IndexServiceError {
Expand Down Expand Up @@ -405,6 +406,47 @@ impl IndexService {
Ok(deleted_entries)
}

/// Detect all dangling parquet splits and associated files from a metrics index
/// and removes them.
///
/// * `index_id` - The target metrics index Id.
/// * `grace_period` - Threshold period after which a staged split can be garbage collected.
/// * `dry_run` - Should this only return a list of affected files without performing deletion.
pub async fn garbage_collect_parquet_index(
&mut self,
index_id: &str,
grace_period: Duration,
dry_run: bool,
) -> anyhow::Result<ParquetSplitRemovalInfo> {
let index_metadata_request = IndexMetadataRequest::for_index_id(index_id.to_string());
let index_metadata = self
.metastore
.index_metadata(index_metadata_request)
.await?
.deserialize_index_metadata()?;
let index_uid = index_metadata.index_uid.clone();
let index_config = index_metadata.into_index_config();
let storage = self
.storage_resolver
.resolve(&index_config.index_uri)
.await?;

let deleted_entries = run_parquet_garbage_collect(
HashMap::from([(index_uid, storage)]),
self.metastore.clone(),
grace_period,
// deletion_grace_period of zero, so that a cli call directly deletes splits after
// marking to be deleted.
Duration::ZERO,
dry_run,
None,
None,
)
.await?;

Ok(deleted_entries)
}

/// Clears the index by applying the following actions:
/// - mark all splits for deletion in the metastore.
/// - delete the files of all splits marked for deletion using garbage collection.
Expand Down
6 changes: 5 additions & 1 deletion quickwit/quickwit-index-management/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@

mod garbage_collection;
mod index;
mod parquet_garbage_collection;

pub use garbage_collection::{GcMetrics, run_garbage_collect};
pub use garbage_collection::{GcMetrics, SplitRemovalInfo, run_garbage_collect};
pub use index::{IndexService, IndexServiceError, clear_cache_directory, validate_storage_uri};
pub use parquet_garbage_collection::{
ParquetSplitInfo, ParquetSplitRemovalInfo, run_parquet_garbage_collect,
};
Loading
Loading