diff --git a/src/storage/azure_blob.rs b/src/storage/azure_blob.rs index b26853288..3755f20e1 100644 --- a/src/storage/azure_blob.rs +++ b/src/storage/azure_blob.rs @@ -16,15 +16,7 @@ * */ -use std::{ - collections::HashSet, - path::Path, - sync::{ - Arc, - atomic::{AtomicU64, Ordering}, - }, - time::Duration, -}; +use std::{collections::HashSet, path::Path, sync::Arc, time::Duration}; use async_trait::async_trait; use bytes::Bytes; @@ -271,50 +263,49 @@ impl BlobStore { tenant_id: &Option, ) -> Result<(), ObjectStorageError> { let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); - let files_scanned = Arc::new(AtomicU64::new(0)); - let files_deleted = Arc::new(AtomicU64::new(0)); + + // List all objects under the prefix let object_stream = self.client.list(Some(&(key.into()))); increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string(), tenant); - object_stream - .for_each_concurrent(None, |x| async { - files_scanned.fetch_add(1, Ordering::Relaxed); - - match x { - Ok(obj) => { - files_deleted.fetch_add(1, Ordering::Relaxed); - let delete_resp = self.client.delete(&obj.location).await; - increment_object_store_calls_by_date( - "DELETE", - &Utc::now().date_naive().to_string(), - tenant, - ); - if delete_resp.is_err() { - error!( - "Failed to delete object during delete stream: {:?}", - delete_resp - ); - } - } - Err(err) => { - error!("Failed to fetch object during delete stream: {:?}", err); - } - }; - }) - .await; + // Map listed objects to their locations for delete_stream + let locations_stream = object_stream.map(|result| result.map(|obj| obj.location)); + + // Use delete_stream which batches deletes via the bulk delete API + let mut delete_results = self.client.delete_stream(Box::pin(locations_stream)); + + let mut files_deleted: u64 = 0; + let mut failed_deletes: u64 = 0; + while let Some(result) = delete_results.next().await { + match result { + Ok(_) => files_deleted += 1, + Err(err) => { + failed_deletes += 1; + error!("Failed to delete object during prefix deletion: {:?}", err); + } + } + } + let total_files = files_deleted + failed_deletes; increment_files_scanned_in_object_store_calls_by_date( "LIST", - files_scanned.load(Ordering::Relaxed), + total_files, &Utc::now().date_naive().to_string(), tenant, ); increment_files_scanned_in_object_store_calls_by_date( "DELETE", - files_deleted.load(Ordering::Relaxed), + files_deleted, &Utc::now().date_naive().to_string(), tenant, ); + + if failed_deletes > 0 { + return Err(ObjectStorageError::UnhandledError( + anyhow::anyhow!("Failed to delete {failed_deletes} out of {total_files} objects under prefix '{key}'").into(), + )); + } + Ok(()) } diff --git a/src/storage/gcs.rs b/src/storage/gcs.rs index b89d595ae..46e76ad75 100644 --- a/src/storage/gcs.rs +++ b/src/storage/gcs.rs @@ -16,15 +16,7 @@ * */ -use std::{ - collections::HashSet, - path::Path, - sync::{ - Arc, - atomic::{AtomicU64, Ordering}, - }, - time::Duration, -}; +use std::{collections::HashSet, path::Path, sync::Arc, time::Duration}; use crate::{ metrics::{ @@ -234,51 +226,50 @@ impl Gcs { key: &str, tenant_id: &Option, ) -> Result<(), ObjectStorageError> { - let files_scanned = Arc::new(AtomicU64::new(0)); - let files_deleted = Arc::new(AtomicU64::new(0)); let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); - // Track LIST operation + + // List all objects under the prefix let object_stream = self.client.list(Some(&(key.into()))); increment_object_store_calls_by_date("LIST", &Utc::now().date_naive().to_string(), tenant); - object_stream - .for_each_concurrent(None, |x| async { - files_scanned.fetch_add(1, Ordering::Relaxed); - - match x { - Ok(obj) => { - files_deleted.fetch_add(1, Ordering::Relaxed); - let delete_resp = self.client.delete(&obj.location).await; - increment_object_store_calls_by_date( - "DELETE", - &Utc::now().date_naive().to_string(), - tenant, - ); - if delete_resp.is_err() { - error!( - "Failed to delete object during delete stream: {:?}", - delete_resp - ); - } - } - Err(err) => { - error!("Failed to fetch object during delete stream: {:?}", err); - } - }; - }) - .await; + // Map listed objects to their locations for delete_stream + let locations_stream = object_stream.map(|result| result.map(|obj| obj.location)); + + // Use delete_stream which batches deletes via the bulk delete API + let mut delete_results = self.client.delete_stream(Box::pin(locations_stream)); + + let mut files_deleted: u64 = 0; + let mut failed_deletes: u64 = 0; + while let Some(result) = delete_results.next().await { + match result { + Ok(_) => files_deleted += 1, + Err(err) => { + failed_deletes += 1; + error!("Failed to delete object during prefix deletion: {:?}", err); + } + } + } + + let total_files = files_deleted + failed_deletes; increment_files_scanned_in_object_store_calls_by_date( "LIST", - files_scanned.load(Ordering::Relaxed), + total_files, &Utc::now().date_naive().to_string(), tenant, ); increment_files_scanned_in_object_store_calls_by_date( "DELETE", - files_deleted.load(Ordering::Relaxed), + files_deleted, &Utc::now().date_naive().to_string(), tenant, ); + + if failed_deletes > 0 { + return Err(ObjectStorageError::UnhandledError( + anyhow::anyhow!("Failed to delete {failed_deletes} out of {total_files} objects under prefix '{key}'").into(), + )); + } + Ok(()) } diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 6577abe96..a0a092aa7 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -17,15 +17,7 @@ */ use std::{ - collections::HashSet, - fmt::Display, - path::Path, - str::FromStr, - sync::{ - Arc, - atomic::{AtomicU64, Ordering}, - }, - time::Duration, + collections::HashSet, fmt::Display, path::Path, str::FromStr, sync::Arc, time::Duration, }; use async_trait::async_trait; @@ -406,9 +398,8 @@ impl S3 { tenant_id: &Option, ) -> Result<(), ObjectStorageError> { let tenant_str = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); - let files_scanned = Arc::new(AtomicU64::new(0)); - let files_deleted = Arc::new(AtomicU64::new(0)); - // Track LIST operation + + // List all objects under the prefix let object_stream = self.client.list(Some(&(key.into()))); increment_object_store_calls_by_date( "LIST", @@ -416,46 +407,44 @@ impl S3 { tenant_str, ); - let tenant_str_clone = tenant_str.to_string(); - object_stream - .for_each_concurrent(None, |x| async { - files_scanned.fetch_add(1, Ordering::Relaxed); - - match x { - Ok(obj) => { - files_deleted.fetch_add(1, Ordering::Relaxed); - let delete_resp = self.client.delete(&obj.location).await; - increment_object_store_calls_by_date( - "DELETE", - &Utc::now().date_naive().to_string(), - &tenant_str_clone, - ); - if delete_resp.is_err() { - error!( - "Failed to delete object during delete stream: {:?}", - delete_resp - ); - } - } - Err(err) => { - error!("Failed to fetch object during delete stream: {:?}", err); - } - }; - }) - .await; + // Map listed objects to their locations for delete_stream + let locations_stream = object_stream.map(|result| result.map(|obj| obj.location)); + + // Use delete_stream which batches deletes via the S3 DeleteObjects API + let mut delete_results = self.client.delete_stream(Box::pin(locations_stream)); + let mut files_deleted: u64 = 0; + let mut failed_deletes: u64 = 0; + while let Some(result) = delete_results.next().await { + match result { + Ok(_) => files_deleted += 1, + Err(err) => { + failed_deletes += 1; + error!("Failed to delete object during prefix deletion: {:?}", err); + } + } + } + + let total_files = files_deleted + failed_deletes; increment_files_scanned_in_object_store_calls_by_date( "LIST", - files_scanned.load(Ordering::Relaxed), + total_files, &Utc::now().date_naive().to_string(), tenant_str, ); increment_files_scanned_in_object_store_calls_by_date( "DELETE", - files_deleted.load(Ordering::Relaxed), + files_deleted, &Utc::now().date_naive().to_string(), tenant_str, ); + + if failed_deletes > 0 { + return Err(ObjectStorageError::UnhandledError( + anyhow::anyhow!("Failed to delete {failed_deletes} out of {total_files} objects under prefix '{key}'").into(), + )); + } + Ok(()) }